1688 作为阿里巴巴旗下的核心批发电商平台,其开放平台提供的item_search_app接口是获取商品数据的核心工具之一。该接口专为移动端应用优化,支持通过关键字搜索获取 1688 平台上的商品信息,广泛应用于货源采集、价格监控、竞品分析等场景,为采购商和供应商提供数据支持。
一、item_search_app 接口核心特性分析
1. 接口定位与核心价值
2. 接口权限与调用限制
3. 核心参数解析
必选参数
可选参数
二、签名生成与返回数据结构
1. 签名生成逻辑
2. 返回数据结构解析
三、Python 实现方案
以下是 1688 item_search_app接口的完整 Python 实现,包含基础调用、数据获取、分析和可视化功能:
import requests
import time
import hashlib
import json
import logging
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from datetime import datetime
from collections import defaultdict
from typing import Dict, List, Optional, Tuple
import re
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
# 配置中文显示
plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"]
plt.rcParams["axes.unicode_minus"] = False
class AlibabaItemSearchApp:
"""1688 item_search_app接口封装类,用于搜索和分析商品数据"""
def __init__(self, app_key: str, app_secret: str):
"""
初始化1688 API客户端
:param app_key: 应用的app_key
:param app_secret: 应用的app_secret
"""
self.app_key = app_key
self.app_secret = app_secret
self.gateway_url = "https://gw.open.1688.com/openapi/http/1/system.oauth2/getToken"
self.api_url = "https://gw.open.1688.com/openapi/param2/1/com.alibaba.product/alibaba.item.search.app/"
# 频率控制
self.rate_limit = 100 # 默认普通开发者限制,认证开发者可修改为300
self.call_timestamps = [] # 存储调用时间戳(毫秒级)
# 访问令牌相关
self.access_token = ""
self.token_expiry = 0 # 令牌过期时间戳(秒级)
def set_rate_limit(self, limit: int) -> None:
"""设置调用频率限制(次/分钟)"""
if 100 <= limit <= 300:
self.rate_limit = limit
logging.info(f"已设置调用频率限制为 {limit} 次/分钟")
else:
logging.warning("频率限制必须在100-300之间,未修改")
def get_access_token(self) -> Optional[str]:
"""获取访问令牌"""
# 检查令牌是否有效
if self.access_token and time.time() < self.token_expiry - 300: # 提前5分钟刷新
return self.access_token
params = {
"grant_type": "client_credentials",
"client_id": self.app_key,
"client_secret": self.app_secret
}
try:
response = requests.get(self.gateway_url, params=params, timeout=10)
response.raise_for_status()
result = response.json()
if "access_token" in result:
self.access_token = result["access_token"]
self.token_expiry = time.time() + result.get("expires_in", 3600) # 默认1小时有效期
logging.info(f"成功获取access_token,有效期 {result.get('expires_in', 3600)} 秒")
return self.access_token
else:
logging.error(f"获取access_token失败: {result.get('error_description')}")
return None
except Exception as e:
logging.error(f"获取access_token异常: {str(e)}")
return None
def _generate_sign(self, params: Dict) -> str:
"""生成签名"""
# 1. 按参数名ASCII升序排序
sorted_params = sorted(params.items(), key=lambda x: x[0])
# 2. 拼接为"key=value"格式
param_str = "".join([f"{k}{v}" for k, v in sorted_params])
# 3. 前后加上app_secret
sign_str = f"{self.app_secret}{param_str}{self.app_secret}"
# 4. MD5加密并转为小写
return hashlib.md5(sign_str.encode("utf-8")).hexdigest().lower()
def _check_rate_limit(self) -> None:
"""检查并控制调用频率"""
current_time = time.time() * 1000 # 转为毫秒
# 保留1分钟内的调用记录
self.call_timestamps = [t for t in self.call_timestamps if current_time - t < 60000]
# 若超过限制,计算需要等待的时间
if len(self.call_timestamps) >= self.rate_limit:
oldest_time = self.call_timestamps[0]
sleep_time = (60000 - (current_time - oldest_time)) / 1000 + 0.1 # 额外加0.1秒保险
logging.warning(f"调用频率超限,等待 {sleep_time:.1f} 秒")
time.sleep(sleep_time)
# 再次清理过期记录
self.call_timestamps = [t for t in self.call_timestamps if time.time()*1000 - t < 60000]
# 记录本次调用时间
self.call_timestamps.append(current_time)
def search_items(self, keywords: str, page: int = 1, page_size: int = 40,
filters: Optional[Dict] = None) -> Optional[Dict]:
"""
搜索商品
:param keywords: 搜索关键词
:param page: 页码
:param page_size: 每页数量
:param filters: 筛选参数
:return: 商品搜索结果
"""
# 确保获取有效令牌
if not self.get_access_token():
logging.error("无法获取有效的access_token,无法调用API")
return None
# 构建基础参数
base_params = {
"app_key": self.app_key,
"method": "alibaba.item.search.app",
"timestamp": str(int(time.time() * 1000)), # 毫秒级时间戳
"format": "json",
"v": "1.0",
"access_token": self.access_token,
"keywords": keywords,
"page": page,
"page_size": page_size
}
# 合并筛选参数
if filters and isinstance(filters, Dict):
# 过滤空值参数
valid_filters = {k: v for k, v in filters.items() if v is not None}
base_params.update(valid_filters)
# 生成签名
sign = self._generate_sign(base_params)
base_params["sign"] = sign
# 检查频率限制
self._check_rate_limit()
try:
# 发送请求
response = requests.get(self.api_url + self.app_key, params=base_params, timeout=15)
response.raise_for_status()
# 解析响应
result = response.json()
# 处理错误
if "error_response" in result:
error = result["error_response"]
logging.error(f"API调用错误: {error.get('msg')} (错误码: {error.get('code')})")
return None
# 提取结果
search_result = result.get("alibaba_item_search_app_response", {}).get("result", {})
if not search_result:
logging.warning("未获取到商品数据")
return None
logging.info(f"成功获取第 {page} 页商品数据,关键词: {keywords}")
return search_result
except requests.exceptions.RequestException as e:
logging.error(f"请求异常: {str(e)}")
return None
except json.JSONDecodeError:
logging.error(f"响应解析失败: {response.text[:200]}...")
return None
def batch_search_items(self, keywords: str, max_pages: int = 5, page_size: int = 40,
filters: Optional[Dict] = None) -> Tuple[List[Dict], Dict]:
"""
批量搜索多页商品
:param keywords: 搜索关键词
:param max_pages: 最大页数
:param page_size: 每页数量
:param filters: 筛选参数
:return: 商品列表和搜索元信息
"""
all_items = []
meta_info = {
"keywords": keywords,
"total_items": 0,
"total_pages": 0,
"filters": filters,
"crawl_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
page = 1
while page <= max_pages:
logging.info(f"正在搜索第 {page}/{max_pages} 页商品...")
result = self.search_items(keywords, page, page_size, filters)
if not result:
break
# 提取商品数据
items = result.get("items", {}).get("item", [])
if not items:
logging.info("当前页无商品数据,停止搜索")
break
all_items.extend(items)
# 保存元信息(第一页)
if page == 1:
meta_info["total_items"] = result.get("total_results", 0)
meta_info["total_pages"] = min(max_pages, (meta_info["total_items"] + page_size - 1) // page_size)
page += 1
logging.info(f"批量搜索完成,共获取 {len(all_items)} 件商品数据")
return all_items, meta_info
def analyze_items(self, items: List[Dict]) -> Dict:
"""分析商品数据"""
if not items:
return {"error": "没有商品数据可分析"}
# 1. 价格分析
prices = []
for item in items:
try:
# 处理可能的价格范围,取最低价格
price_str = item.get("price", "0")
if "-" in price_str:
price = float(price_str.split("-")[0])
else:
price = float(price_str)
prices.append(price)
except (ValueError, TypeError):
continue
price_stats = {}
if prices:
price_stats = {
"min": round(min(prices), 2),
"max": round(max(prices), 2),
"avg": round(sum(prices) / len(prices), 2),
"median": round(np.median(prices), 2)
}
# 2. 销量分析
sales = []
for item in items:
try:
sale_str = item.get("sales", "0")
# 提取数字(去除"件"等单位)
sale_num = int(re.findall(r'\d+', sale_str)[0]) if re.findall(r'\d+', sale_str) else 0
sales.append(sale_num)
except (ValueError, IndexError, TypeError):
continue
sales_stats = {}
if sales:
sales_stats = {
"total": sum(sales),
"avg": round(sum(sales) / len(sales), 1),
"top3": sorted(sales, reverse=True)[:3]
}
# 3. 地区分布
city_counts = defaultdict(int)
for item in items:
city = item.get("city", "未知")
city_counts[city] += 1
# 4. 供应商类型分析
supplier_types = defaultdict(int)
for item in items:
member_type = item.get("member_type", "普通供应商")
if member_type == "goldsupplier":
supplier_types["金牌供应商"] += 1
elif member_type == "factory":
supplier_types["工厂"] += 1
elif member_type == "tmall":
supplier_types["天猫供应商"] += 1
else:
supplier_types["普通供应商"] += 1
# 5. 起订量分析
min_order_counts = defaultdict(int)
for item in items:
try:
min_order = int(item.get("min_order", "1"))
if min_order <= 10:
min_order_counts["1-10件"] += 1
elif min_order <= 50:
min_order_counts["11-50件"] += 1
elif min_order <= 100:
min_order_counts["51-100件"] += 1
else:
min_order_counts["100件以上"] += 1
except (ValueError, TypeError):
min_order_counts["未知"] += 1
return {
"total_items": len(items),
"price_analysis": price_stats,
"sales_analysis": sales_stats,
"city_distribution": dict(city_counts),
"supplier_type_distribution": dict(supplier_types),
"min_order_distribution": dict(min_order_counts)
}
def get_top_suppliers(self, items: List[Dict], top_n: int = 10) -> List[Dict]:
"""获取优质供应商列表"""
if not items:
return []
# 按供应商ID分组统计
supplier_data = defaultdict(lambda: {
"name": "",
"item_count": 0,
"avg_price": 0,
"total_sales": 0,
"city": "",
"member_type": "",
"rating": 0
})
for item in items:
seller_id = item.get("seller_id", "")
if not seller_id:
continue
# 更新供应商基本信息
if not supplier_data[seller_id]["name"]:
supplier_data[seller_id]["name"] = item.get("nick", "")
supplier_data[seller_id]["city"] = item.get("city", "")
supplier_data[seller_id]["member_type"] = item.get("member_type", "")
supplier_data[seller_id]["rating"] = float(item.get("rating", "0"))
# 累计商品数量
supplier_data[seller_id]["item_count"] += 1
# 累计销售额(估算)
try:
price_str = item.get("price", "0")
if "-" in price_str:
price = float(price_str.split("-")[0])
else:
price = float(price_str)
sale_str = item.get("sales", "0")
sale_num = int(re.findall(r'\d+', sale_str)[0]) if re.findall(r'\d+', sale_str) else 0
supplier_data[seller_id]["total_sales"] += price * sale_num
supplier_data[seller_id]["avg_price"] = (
(supplier_data[seller_id]["avg_price"] * (supplier_data[seller_id]["item_count"] - 1) + price) /
supplier_data[seller_id]["item_count"]
)
except (ValueError, IndexError, TypeError):
continue
# 转换为列表并排序
suppliers = list(supplier_data.values())
# 综合评分(销量*0.4 + 评分*0.3 + 商品数*0.3)
for s in suppliers:
# 标准化各项指标
s["score"] = (
min(s["total_sales"] / 10000, 10) * 0.4 + # 销售额(万元),上限10
s["rating"] * 0.3 + # 评分(0-5)
min(s["item_count"], 10) * 0.3 # 商品数,上限10
)
# 按综合评分排序并返回前N名
return sorted(suppliers, key=lambda x: x["score"], reverse=True)[:top_n]
def visualize_analysis(self, analysis: Dict, keywords: str, output_dir: str = ".") -> None:
"""可视化分析结果"""
# 1. 价格分布直方图
if "price_analysis" in analysis and analysis["price_analysis"]:
plt.figure(figsize=(10, 6))
# 提取所有价格
prices = []
for item in analysis.get("raw_items", []):
try:
price_str = item.get("price", "0")
if "-" in price_str:
price = float(price_str.split("-")[0])
else:
price = float(price_str)
prices.append(price)
except (ValueError, TypeError):
continue
plt.hist(prices, bins=10, alpha=0.7, color='lightblue')
plt.axvline(analysis["price_analysis"]["avg"], color='red', linestyle='--',
label=f'平均价格: {analysis["price_analysis"]["avg"]}元')
plt.title(f'关键词"{keywords}"商品价格分布')
plt.xlabel('价格(元)')
plt.ylabel('商品数量')
plt.legend()
plt.tight_layout()
plt.savefig(f"{output_dir}/price_distribution.png")
plt.close()
logging.info(f"价格分布图表已保存至 {output_dir}/price_distribution.png")
# 2. 地区分布饼图
if "city_distribution" in analysis and analysis["city_distribution"]:
plt.figure(figsize=(8, 8))
# 只显示占比前5的地区,其余归为"其他"
city_data = sorted(analysis["city_distribution"].items(), key=lambda x: x[1], reverse=True)
if len(city_data) > 5:
top5 = city_data[:5]
others = sum(count for _, count in city_data[5:])
top5.append(("其他", others))
city_data = top5
labels, sizes = zip(*city_data)
plt.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=90)
plt.title(f'关键词"{keywords}"商品产地分布')
plt.axis('equal')
plt.tight_layout()
plt.savefig(f"{output_dir}/city_distribution.png")
plt.close()
logging.info(f"产地分布图表已保存至 {output_dir}/city_distribution.png")
# 3. 供应商类型分布条形图
if "supplier_type_distribution" in analysis and analysis["supplier_type_distribution"]:
plt.figure(figsize=(10, 6))
types = list(analysis["supplier_type_distribution"].keys())
counts = list(analysis["supplier_type_distribution"].values())
plt.bar(types, counts, color='lightgreen')
plt.title(f'关键词"{keywords}"供应商类型分布')
plt.xlabel('供应商类型')
plt.ylabel('数量')
for i, v in enumerate(counts):
plt.text(i, v + 0.5, str(v), ha='center')
plt.tight_layout()
plt.savefig(f"{output_dir}/supplier_type.png")
plt.close()
logging.info(f"供应商类型图表已保存至 {output_dir}/supplier_type.png")
# 4. 销量TOP10商品条形图
if "sales_analysis" in analysis and "raw_items" in analysis:
# 提取销量TOP10的商品
top10_sales = []
for item in analysis["raw_items"]:
try:
sale_str = item.get("sales", "0")
sale_num = int(re.findall(r'\d+', sale_str)[0]) if re.findall(r'\d+', sale_str) else 0
top10_sales.append((item.get("title", ""), sale_num))
except (ValueError, IndexError, TypeError):
continue
# 排序并取前10
top10_sales = sorted(top10_sales, key=lambda x: x[1], reverse=True)[:10]
plt.figure(figsize=(12, 6))
names = [name[:10] + "..." for name, _ in top10_sales] # 截断长标题
sales = [sale for _, sale in top10_sales]
plt.barh(names, sales, color='orange')
plt.xlabel('30天销量(件)')
plt.title(f'关键词"{keywords}"销量TOP10商品')
plt.gca().invert_yaxis() # 反转Y轴,让第一名在最上方
plt.tight_layout()
plt.savefig(f"{output_dir}/top10_sales.png")
plt.close()
logging.info(f"销量TOP10图表已保存至 {output_dir}/top10_sales.png")
def export_to_excel(self, items: List[Dict], analysis: Dict, meta_info: Dict, filename: str) -> None:
"""导出商品数据和分析结果到Excel"""
if not items and not analysis:
logging.warning("没有数据可导出")
return
try:
with pd.ExcelWriter(filename) as writer:
# 搜索元信息
pd.DataFrame([meta_info]).to_excel(writer, sheet_name='搜索信息', index=False)
# 商品数据
if items:
# 提取需要的字段,避免过多冗余信息
filtered_items = []
for item in items:
filtered = {
"商品ID": item.get("item_id"),
"标题": item.get("title"),
"价格": item.get("price"),
"30天销量": item.get("sales"),
"起订量": item.get("min_order"),
"供应商": item.get("nick"),
"所在地": item.get("city"),
"供应商类型": item.get("member_type"),
"评分": item.get("rating"),
"商品链接": item.get("detail_url")
}
filtered_items.append(filtered)
df_items = pd.DataFrame(filtered_items)
df_items.to_excel(writer, sheet_name='商品数据', index=False)
# 分析结果
if analysis and "error" not in analysis:
# 价格分析
if "price_analysis" in analysis and analysis["price_analysis"]:
df_price = pd.DataFrame([analysis["price_analysis"]],
index=["数值"])
df_price.to_excel(writer, sheet_name='价格分析')
# 销量分析
if "sales_analysis" in analysis and analysis["sales_analysis"]:
df_sales = pd.DataFrame([analysis["sales_analysis"]],
index=["数值"])
df_sales.to_excel(writer, sheet_name='销量分析')
# 地区分布
if "city_distribution" in analysis and analysis["city_distribution"]:
df_city = pd.DataFrame(list(analysis["city_distribution"].items()),
columns=['地区', '数量'])
df_city.to_excel(writer, sheet_name='地区分布', index=False)
logging.info(f"数据已导出至 {filename}")
except Exception as e:
logging.error(f"导出Excel失败: {e}")
# 示例调用
if __name__ == "__main__":
# 替换为实际的参数(从1688开放平台获取)
APP_KEY = "your_app_key"
APP_SECRET = "your_app_secret"
KEYWORDS = "夏季 连衣裙" # 搜索关键词
# 初始化API客户端
alibaba_api = AlibabaItemSearchApp(APP_KEY, APP_SECRET)
# 若为认证开发者,设置更高的频率限制
# alibaba_api.set_rate_limit(300)
# 1. 设置筛选条件
filters = {
"start_price": 50, # 最低价格50元
"end_price": 200, # 最高价格200元
"province": "浙江", # 浙江省
"is_factory": True, # 仅工厂直供
"sort": "sale_desc" # 按销量降序
}
# 2. 批量搜索商品
print("=== 搜索商品数据 ===")
items, meta_info = alibaba_api.batch_search_items(
keywords=KEYWORDS,
max_pages=3, # 获取前3页
page_size=40,
filters=filters
)
if items:
print(f"搜索关键词: {KEYWORDS}")
print(f"获取商品数量: {len(items)}")
print(f"总商品数量: {meta_info['total_items']}")
# 3. 分析商品数据
print("\n=== 商品数据分析 ===")
if items:
analysis = alibaba_api.analyze_items(items)
# 保存原始数据用于可视化
analysis["raw_items"] = items
print(f"价格范围: {analysis['price_analysis']['min']}-{analysis['price_analysis']['max']}元")
print(f"平均价格: {analysis['price_analysis']['avg']}元")
print(f"总销量: {analysis['sales_analysis']['total']}件")
print(f"平均销量: {analysis['sales_analysis']['avg']}件")
print("\n主要产地:")
for city, count in sorted(analysis["city_distribution"].items(), key=lambda x: x[1], reverse=True)[:3]:
print(f" {city}: {count}件商品")
print("\n供应商类型分布:")
for type_name, count in analysis["supplier_type_distribution"].items():
print(f" {type_name}: {count}家")
# 4. 获取优质供应商
print("\n=== 优质供应商TOP5 ===")
top_suppliers = alibaba_api.get_top_suppliers(items, top_n=5)
for i, supplier in enumerate(top_suppliers, 1):
print(f"{i}. {supplier['name']} (所在地: {supplier['city']})")
print(f" 商品数: {supplier['item_count']}, 总销量(估算): {supplier['total_sales']:.0f}元, 评分: {supplier['rating']}")
# 5. 可视化分析结果
alibaba_api.visualize_analysis(analysis, KEYWORDS)
# 6. 导出数据到Excel
alibaba_api.export_to_excel(items, analysis, meta_info, "1688商品搜索分析.xlsx")
else:
print("未获取到商品数据,无法进行分析")