1688 作为阿里巴巴旗下的大型批发电商平台,其开放平台提供了丰富的 API 接口,允许开发者获取商品信息、订单数据、供应商信息等。自定义 API 操作是基于 1688 开放平台的基础接口,通过组合调用和二次开发,实现特定业务需求的功能集合,广泛应用于供应链管理、采购自动化、市场分析等场景。
一、1688 自定义 API 核心特性分析
1. 平台架构与认证机制
2. 核心 API 分类与功能
3. 自定义 API 操作模式
二、Python 实现方案
import requests
import time
import hashlib
import json
import logging
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from datetime import datetime, timedelta
from collections import defaultdict
from typing import Dict, List, Optional, Tuple
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
class Alibaba1688CustomAPI:
"""1688自定义API操作类,封装基础API调用和业务逻辑"""
def __init__(self, appkey: str, appsecret: str, access_token: str = ""):
"""
初始化1688 API客户端
:param appkey: 应用的appkey
:param appsecret: 应用的appsecret
:param access_token: 访问令牌
"""
self.appkey = appkey
self.appsecret = appsecret
self.access_token = access_token
self.gateway_url = "https://gw.open.1688.com/openapi/param2/2.0/"
self.session = requests.Session()
self.session.headers.update({
"Content-Type": "application/x-www-form-urlencoded;charset=utf-8",
"User-Agent": "1688CustomAPI/1.0.0 (Python)"
})
# 频率控制
self.api_calls = defaultdict(int)
self.rate_limit = 60 # 每分钟最多调用次数
self.last_minute = time.time()
def set_access_token(self, access_token: str) -> None:
"""设置访问令牌"""
self.access_token = access_token
def _generate_sign(self, params: Dict) -> str:
"""生成签名"""
# 排序参数
sorted_params = sorted(params.items(), key=lambda x: x[0])
# 拼接签名字符串
sign_str = self.appsecret
for k, v in sorted_params:
sign_str += f"{k}{v}"
sign_str += self.appsecret
# 计算MD5
return hashlib.md5(sign_str.encode('utf-8')).hexdigest().upper()
def _check_rate_limit(self) -> bool:
"""检查API调用频率限制"""
current_time = time.time()
# 每分钟重置计数
if current_time - self.last_minute > 60:
self.api_calls.clear()
self.last_minute = current_time
# 检查是否超过限制
if sum(self.api_calls.values()) >= self.rate_limit:
sleep_time = 60 - (current_time - self.last_minute)
logging.warning(f"API调用频率超限,等待 {sleep_time:.1f} 秒")
time.sleep(sleep_time + 1)
self.api_calls.clear()
self.last_minute = time.time()
return True
def call_api(self, api_name: str, params: Dict = None) -> Optional[Dict]:
"""
调用1688 API
:param api_name: API名称,如offer.get
:param params: API参数
:return: API返回结果
"""
if not self.access_token:
logging.error("请先设置access_token")
return None
# 检查频率限制
if not self._check_rate_limit():
return None
# 构建基础参数
base_params = {
"app_key": self.appkey,
"access_token": self.access_token,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"format": "json",
"v": "2.0",
"sign_method": "md5"
}
# 合并参数
if params:
base_params.update(params)
# 生成签名
sign = self._generate_sign(base_params)
base_params["sign"] = sign
try:
# 构建完整URL
url = f"{self.gateway_url}{api_name}"
response = self.session.post(url, data=base_params, timeout=15)
response.raise_for_status()
# 解析响应
result = response.json()
# 处理错误
if "error_response" in result:
error = result["error_response"]
logging.error(f"API调用错误: {error.get('msg')} (错误码: {error.get('code')})")
return None
# 返回结果
return result.get(f"{api_name.replace('.', '_')}_response")
except requests.exceptions.RequestException as e:
logging.error(f"API请求异常: {str(e)}")
return None
except json.JSONDecodeError:
logging.error(f"API响应解析失败: {response.text[:200]}...")
return None
finally:
# 记录API调用
self.api_calls[api_name] += 1
# ------------------------------
# 商品相关自定义操作
# ------------------------------
def search_products(self, keywords: str, page: int = 1, page_size: int = 40,
min_price: float = None, max_price: float = None) -> Dict:
"""
搜索商品
:param keywords: 搜索关键词
:param page: 页码
:param page_size: 每页数量
:param min_price: 最低价格
:param max_price: 最高价格
:return: 商品列表及分页信息
"""
params = {
"keywords": keywords,
"page_no": page,
"page_size": page_size,
"sort_type": "default" # 默认排序
}
if min_price is not None:
params["start_price"] = min_price
if max_price is not None:
params["end_price"] = max_price
result = self.call_api("offer.search", params)
if not result:
return {"total": 0, "products": []}
total = int(result.get("total_results", 0))
products = result.get("offers", {}).get("offer", [])
return {
"total": total,
"page": page,
"page_size": page_size,
"total_pages": (total + page_size - 1) // page_size,
"products": self._format_products(products)
}
def get_product_details(self, offer_id: str) -> Optional[Dict]:
"""获取商品详情"""
params = {
"offer_id": offer_id,
"fields": "offer_id,subject,price,price_range,min_amount,amount,on_sale_time,"
"detail_url,pic_url,seller,spec_info,promotion"
}
result = self.call_api("offer.get", params)
if not result:
return None
return self._format_product(result.get("offer", {}))
def batch_get_products(self, offer_ids: List[str]) -> List[Dict]:
"""批量获取商品信息"""
if not offer_ids:
return []
products = []
# 1688 API一次最多支持20个ID
batch_size = 20
for i in range(0, len(offer_ids), batch_size):
batch_ids = offer_ids[i:i+batch_size]
params = {
"offer_ids": ",".join(batch_ids),
"fields": "offer_id,subject,price,min_amount,amount,seller, pic_url"
}
result = self.call_api("offer.getBatch", params)
if result and "offers" in result:
formatted = [self._format_product(offer) for offer in result["offers"]]
products.extend(formatted)
# 控制调用频率
time.sleep(1)
logging.info(f"成功获取 {len(products)} 件商品信息")
return products
def analyze_product_prices(self, keywords: str, max_pages: int = 3) -> Dict:
"""分析特定关键词商品的价格分布"""
all_products = []
page = 1
while page <= max_pages:
logging.info(f"搜索商品第 {page} 页: {keywords}")
result = self.search_products(keywords, page)
if not result["products"]:
break
all_products.extend(result["products"])
if page >= result["total_pages"]:
break
page += 1
time.sleep(1)
# 提取价格信息
prices = []
for product in all_products:
# 处理价格范围
if isinstance(product["price"], dict) and "min" in product["price"]:
prices.append(product["price"]["min"])
prices.append(product["price"]["max"])
else:
prices.append(product["price"])
# 价格统计
price_stats = {}
if prices:
prices = [float(p) for p in prices]
price_stats = {
"min": min(prices),
"max": max(prices),
"avg": sum(prices) / len(prices),
"median": self._calculate_median(prices)
}
# 供应商分析
supplier_counts = defaultdict(int)
for product in all_products:
supplier_id = product["seller"]["user_id"]
supplier_counts[supplier_id] += 1
# 起订量分析
moq_stats = {}
moqs = [int(p["min_amount"]) for p in all_products if p["min_amount"]]
if moqs:
moq_stats = {
"min": min(moqs),
"max": max(moqs),
"avg": sum(moqs) / len(moqs)
}
return {
"total_products": len(all_products),
"price_stats": price_stats,
"moq_stats": moq_stats,
"top_suppliers": sorted(supplier_counts.items(), key=lambda x: x[1], reverse=True)[:5],
"products": all_products
}
# ------------------------------
# 供应商相关自定义操作
# ------------------------------
def get_supplier_info(self, user_id: str) -> Optional[Dict]:
"""获取供应商信息"""
params = {
"member_id": user_id,
"fields": "user_id,company_name,main_product,province,city,member_type,"
"start_year,credit_rating,transaction_volume,好评率"
}
result = self.call_api("member.get", params)
if not result:
return None
return {
"user_id": result.get("user_id"),
"company_name": result.get("company_name"),
"main_product": result.get("main_product"),
"location": f"{result.get('province', '')} {result.get('city', '')}",
"member_type": result.get("member_type"),
"established_year": result.get("start_year"),
"credit_rating": result.get("credit_rating"),
"transaction_volume": result.get("transaction_volume"),
"positive_rate": result.get("好评率"),
"url": f"https://shop{user_id}.1688.com"
}
def evaluate_supplier(self, user_id: str) -> Dict:
"""评估供应商综合实力(自定义评分模型)"""
supplier = self.get_supplier_info(user_id)
if not supplier:
return {"error": "无法获取供应商信息"}
# 简单评分模型(0-100分)
score = 0
# 会员类型评分(诚信通年限)
member_type = supplier.get("member_type", "")
if "诚信通" in member_type:
try:
years = int(re.search(r'\d+', member_type).group())
score += min(years * 3, 30) # 最多30分
except:
score += 10
# 成立年限评分
established_year = supplier.get("established_year", "")
if established_year and established_year.isdigit():
years = datetime.now().year - int(established_year)
score += min(years * 2, 20) # 最多20分
# 信用评级评分
credit_rating = supplier.get("credit_rating", "")
if "AAA" in credit_rating:
score += 20
elif "AA" in credit_rating:
score += 15
elif "A" in credit_rating:
score += 10
# 好评率评分
positive_rate = supplier.get("positive_rate", "0%")
try:
rate = float(positive_rate.strip('%'))
score += min(rate * 0.2, 30) # 最多30分
except:
pass
# 综合评级
if score >= 90:
rating = "优秀"
elif score >= 70:
rating = "良好"
elif score >= 50:
rating = "一般"
else:
rating = "需谨慎"
return {
"supplier": supplier,
"score": round(score, 1),
"rating": rating,
"evaluation_details": {
"member_type_contribution": min(score - (score % 10), 30),
"established_year_contribution": min((score // 10) * 10 - 30, 20) if score > 30 else 0,
"credit_rating_contribution": min(score - 50, 20) if score > 50 else 0,
"positive_rate_contribution": min(score - 70, 30) if score > 70 else 0
}
}
# ------------------------------
# 采购相关自定义操作
# ------------------------------
def compare_suppliers(self, product_id: str, max_suppliers: int = 5) -> Dict:
"""对比同一商品的不同供应商"""
# 获取商品详情
product = self.get_product_details(product_id)
if not product:
return {"error": "无法获取商品信息"}
# 搜索同类商品
search_result = self.search_products(product["subject"], page_size=max_suppliers)
if not search_result["products"]:
return {"error": "未找到同类商品"}
# 获取所有供应商信息
supplier_ids = [p["seller"]["user_id"] for p in search_result["products"]]
suppliers = []
for sid in supplier_ids[:max_suppliers]:
eval_result = self.evaluate_supplier(sid)
suppliers.append(eval_result)
# 提取对比数据
comparison_data = []
for i, p in enumerate(search_result["products"][:max_suppliers]):
supplier_eval = next((s for s in suppliers if s["supplier"]["user_id"] == p["seller"]["user_id"]), None)
comparison_data.append({
"product_id": p["offer_id"],
"product_name": p["subject"],
"price": p["price"],
"min_order": p["min_amount"],
"stock": p["amount"],
"supplier_name": p["seller"]["company_name"],
"supplier_rating": supplier_eval["rating"] if supplier_eval else "未知",
"supplier_score": supplier_eval["score"] if supplier_eval else 0,
"url": p["detail_url"]
})
return {
"original_product": product,
"comparison_count": len(comparison_data),
"comparison_data": comparison_data,
"best_value": max(comparison_data, key=lambda x:
(x["supplier_score"], -float(x["price"]) if isinstance(x["price"], (int, float))
else -float(x["price"]["min"])))
}
def create_purchase_plan(self, product_ids: List[str], quantities: List[int]) -> Dict:
"""创建采购计划(自定义业务逻辑)"""
if len(product_ids) != len(quantities):
return {"error": "商品ID和数量长度不匹配"}
# 批量获取商品信息
products = self.batch_get_products(product_ids)
if not products:
return {"error": "无法获取商品信息"}
# 计算总采购金额
total_amount = 0
purchase_items = []
for i, product in enumerate(products):
quantity = quantities[i]
if quantity <= 0:
continue
# 处理价格
if isinstance(product["price"], dict) and "min" in product["price"]:
price = float(product["price"]["min"])
else:
price = float(product["price"])
# 计算小计
subtotal = price * quantity
total_amount += subtotal
purchase_items.append({
"product_id": product["offer_id"],
"product_name": product["subject"],
"price": price,
"quantity": quantity,
"subtotal": subtotal,
"supplier": product["seller"]["company_name"],
"supplier_id": product["seller"]["user_id"]
})
# 按供应商分组
supplier_groups = defaultdict(list)
for item in purchase_items:
supplier_groups[item["supplier_id"]].append(item)
return {
"total_items": len(purchase_items),
"total_amount": total_amount,
"purchase_items": purchase_items,
"by_supplier": {k: {
"supplier_name": v[0]["supplier"],
"items": v,
"total": sum(i["subtotal"] for i in v)
} for k, v in supplier_groups.items()},
"created_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
# ------------------------------
# 工具方法
# ------------------------------
def _format_products(self, products: List[Dict]) -> List[Dict]:
"""格式化商品列表数据"""
return [self._format_product(p) for p in products]
def _format_product(self, product: Dict) -> Dict:
"""格式化单个商品数据"""
# 处理价格
price = product.get("price", 0)
if "price_range" in product and product["price_range"]:
price = {
"min": product["price_range"].get("min_price", price),
"max": product["price_range"].get("max_price", price)
}
return {
"offer_id": product.get("offer_id"),
"subject": product.get("subject", ""),
"price": price,
"min_amount": product.get("min_amount", 1),
"amount": product.get("amount", 0), # 库存
"on_sale_time": product.get("on_sale_time", ""),
"detail_url": product.get("detail_url", ""),
"pic_url": product.get("pic_url", ""),
"seller": {
"user_id": product.get("seller", {}).get("user_id", ""),
"company_name": product.get("seller", {}).get("company_name", ""),
"member_type": product.get("seller", {}).get("member_type", "")
},
"spec_info": product.get("spec_info", {}),
"has_promotion": bool(product.get("promotion", {}))
}
def _calculate_median(self, data: List[float]) -> float:
"""计算中位数"""
sorted_data = sorted(data)
n = len(sorted_data)
if n % 2 == 1:
return round(sorted_data[n//2], 2)
else:
return round((sorted_data[n//2 - 1] + sorted_data[n//2]) / 2, 2)
def visualize_analysis(self, analysis: Dict, title: str, output_file: str) -> None:
"""可视化分析结果"""
plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"]
plt.rcParams["axes.unicode_minus"] = False
fig, ax = plt.subplots(figsize=(10, 6))
if "price_stats" in analysis:
# 价格分布直方图
if "products" in analysis and analysis["products"]:
prices = []
for product in analysis["products"]:
if isinstance(product["price"], dict) and "min" in product["price"]:
prices.append(float(product["price"]["min"]))
prices.append(float(product["price"]["max"]))
else:
prices.append(float(product["price"]))
ax.hist(prices, bins=10, alpha=0.7, color='skyblue')
ax.axvline(analysis["price_stats"]["avg"], color='r', linestyle='dashed', linewidth=1,
label=f'平均价格: {analysis["price_stats"]["avg"]:.2f}')
ax.set_title(f"{title} - 商品价格分布")
ax.set_xlabel("价格 (元)")
ax.set_ylabel("商品数量")
ax.legend()
elif "comparison_data" in analysis:
# 供应商对比条形图
suppliers = [item["supplier_name"][:5] + "..." for item in analysis["comparison_data"]]
prices = []
for item in analysis["comparison_data"]:
if isinstance(item["price"], dict) and "min" in item["price"]:
prices.append(float(item["price"]["min"]))
else:
prices.append(float(item["price"]))
x = np.arange(len(suppliers))
width = 0.35
ax.bar(x - width/2, prices, width, label='价格')
ax.set_title(f"{title} - 供应商价格对比")
ax.set_xlabel("供应商")
ax.set_ylabel("价格 (元)")
ax.set_xticks(x)
ax.set_xticklabels(suppliers)
ax.legend()
elif "score" in analysis:
# 供应商评分雷达图
labels = ['会员类型', '成立年限', '信用评级', '好评率']
stats = [
analysis["evaluation_details"]["member_type_contribution"] / 30 * 100,
analysis["evaluation_details"]["established_year_contribution"] / 20 * 100,
analysis["evaluation_details"]["credit_rating_contribution"] / 20 * 100,
analysis["evaluation_details"]["positive_rate_contribution"] / 30 * 100
]
# 闭合雷达图
labels = labels + [labels[0]]
stats = stats + [stats[0]]
ax.polar(np.linspace(0, 2*np.pi, len(labels)), stats, 'o-', linewidth=2)
ax.fill(np.linspace(0, 2*np.pi, len(labels)), stats, alpha=0.25)
ax.set_thetagrids(np.linspace(0, 360, len(labels)), labels)
ax.set_ylim(0, 100)
ax.set_title(f"{title} - 供应商评分雷达图 (总分: {analysis['score']})")
plt.tight_layout()
plt.savefig(output_file)
plt.close()
logging.info(f"分析图表已保存至 {output_file}")
def export_to_excel(self, data: List[Dict], filename: str, sheet_name: str = "数据") -> None:
"""将数据导出到Excel"""
if not data:
logging.warning("没有可导出的数据")
return
df = pd.DataFrame(data)
df.to_excel(filename, sheet_name=sheet_name, index=False)
logging.info(f"数据已导出至 {filename}")
# 示例调用
if __name__ == "__main__":
# 替换为实际的参数(从1688开放平台获取)
APPKEY = "your_appkey"
APPSECRET = "your_appsecret"
ACCESS_TOKEN = "your_access_token"
# 初始化API客户端
api = Alibaba1688CustomAPI(APPKEY, APPSECRET, ACCESS_TOKEN)
# 示例1: 商品价格分析
print("=== 商品价格分析 ===")
KEYWORDS = "女装 夏季" # 搜索关键词
price_analysis = api.analyze_product_prices(KEYWORDS, max_pages=3)
print(f"搜索关键词: {KEYWORDS}")
print(f"获取商品总数: {price_analysis['total_products']}")
if price_analysis["price_stats"]:
print(f"价格范围: {price_analysis['price_stats']['min']}-{price_analysis['price_stats']['max']} 元")
print(f"平均价格: {price_analysis['price_stats']['avg']:.2f} 元")
if price_analysis["moq_stats"]:
print(f"起订量范围: {price_analysis['moq_stats']['min']}-{price_analysis['moq_stats']['max']} 件")
api.visualize_analysis(price_analysis, f"商品价格分析: {KEYWORDS}", "product_price_analysis.png")
# 示例2: 供应商评估
print("\n=== 供应商评估 ===")
if price_analysis["top_suppliers"]:
SUPPLIER_ID = price_analysis["top_suppliers"][0][0] # 取商品最多的供应商
supplier_evaluation = api.evaluate_supplier(SUPPLIER_ID)
print(f"供应商名称: {supplier_evaluation['supplier']['company_name']}")
print(f"综合评分: {supplier_evaluation['score']} 分")
print(f"综合评级: {supplier_evaluation['rating']}")
print(f"所在地区: {supplier_evaluation['supplier']['location']}")
print(f"主营产品: {supplier_evaluation['supplier']['main_product']}")
api.visualize_analysis(supplier_evaluation, f"供应商评估: {supplier_evaluation['supplier']['company_name']}",
"supplier_evaluation.png")
# 示例3: 供应商对比
print("\n=== 供应商对比 ===")
if price_analysis["products"]:
PRODUCT_ID = price_analysis["products"][0]["offer_id"] # 取第一个商品
comparison = api.compare_suppliers(PRODUCT_ID)
print(f"对比商品: {comparison['original_product']['subject'][:30]}...")
print(f"参与对比的供应商数量: {comparison['comparison_count']}")
print("推荐供应商:")
print(f" 名称: {comparison['best_value']['supplier_name']}")
print(f" 价格: {comparison['best_value']['price']} 元")
print(f" 起订量: {comparison['best_value']['min_order']} 件")
print(f" 评分: {comparison['best_value']['supplier_rating']}")
api.visualize_analysis(comparison, f"供应商对比: {comparison['original_product']['subject'][:10]}...",
"supplier_comparison.png")
# 示例4: 创建采购计划
print("\n=== 采购计划 ===")
if price_analysis["products"]:
# 选取前3个商品创建采购计划
product_ids = [p["offer_id"] for p in price_analysis["products"][:3]]
quantities = [10, 20, 15] # 采购数量
purchase_plan = api.create_purchase_plan(product_ids, quantities)
print(f"采购计划包含 {purchase_plan['total_items']} 种商品")
print(f"总采购金额: {purchase_plan['total_amount']:.2f} 元")
print("按供应商分组:")
for sid, group in purchase_plan["by_supplier"].items():
print(f" 供应商: {group['supplier_name']}")
print(f" 商品数量: {len(group['items'])} 种")
print(f" 小计金额: {group['total']:.2f} 元")
api.export_to_excel(purchase_plan["purchase_items"], "purchase_plan.xlsx", "采购清单")