淘宝开放平台提供了丰富的 API 接口,而 "自定义 API 操作" 通常指基于淘宝开放平台基础接口进行二次开发,或通过封装特定业务需求组合多个 API 实现复杂功能。这种自定义操作在电商数据分析、店铺管理自动化、订单处理等场景中非常常见。
一、淘宝自定义 API 操作核心特性
1. 基础框架与认证机制
2. 自定义 API 操作的常见模式
3. 核心 API 与应用场景
二、Python 实现方案
import requests
import time
import hashlib
import json
import logging
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from datetime import datetime, timedelta
from collections import defaultdict
from typing import Dict, List, Optional, Tuple
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
class TaobaoCustomAPI:
"""淘宝自定义API操作类,封装基础API调用和自定义业务逻辑"""
def __init__(self, appkey: str, appsecret: str, access_token: str = ""):
"""
初始化淘宝API客户端
:param appkey: 应用的appkey
:param appsecret: 应用的appsecret
:param access_token: 用户授权令牌
"""
self.appkey = appkey
self.appsecret = appsecret
self.access_token = access_token
self.gateway_url = "https://eco.taobao.com/router/rest"
self.session = requests.Session()
self.session.headers.update({
"Content-Type": "application/x-www-form-urlencoded;charset=utf-8",
"User-Agent": "TaobaoCustomAPI/1.0.0 (Python)"
})
# 记录API调用频率,防止超限
self.api_calls = defaultdict(int)
self.last_reset_time = time.time()
def set_access_token(self, access_token: str) -> None:
"""设置访问令牌"""
self.access_token = access_token
def _generate_sign(self, params: Dict) -> str:
"""生成签名"""
# 排序参数
sorted_params = sorted(params.items(), key=lambda x: x[0])
# 拼接字符串
sign_str = self.appsecret
for k, v in sorted_params:
sign_str += f"{k}{v}"
sign_str += self.appsecret
# 计算MD5
return hashlib.md5(sign_str.encode('utf-8')).hexdigest().upper()
def _check_rate_limit(self, api_name: str) -> bool:
"""检查API调用频率限制"""
# 每小时重置计数
current_time = time.time()
if current_time - self.last_reset_time > 3600:
self.api_calls.clear()
self.last_reset_time = current_time
# 淘宝API通常限制为60次/分钟
if self.api_calls[api_name] >= 60:
logging.warning(f"API {api_name} 调用频率超限,等待1分钟")
time.sleep(60)
self.api_calls[api_name] = 0
return False
self.api_calls[api_name] += 1
return True
def call_api(self, method: str, params: Dict = None) -> Optional[Dict]:
"""
调用淘宝API
:param method: API方法名,如taobao.item.get
:param params: API参数
:return: API返回结果
"""
if not self.access_token and method != "taobao.oauth.token":
logging.error("请先设置access_token")
return None
# 检查频率限制
if not self._check_rate_limit(method):
return None
# 构建基础参数
base_params = {
"method": method,
"app_key": self.appkey,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"format": "json",
"v": "2.0",
"sign_method": "md5",
"session": self.access_token
}
# 合并参数
if params:
base_params.update(params)
# 生成签名
sign = self._generate_sign(base_params)
base_params["sign"] = sign
try:
# 发送请求
response = self.session.post(self.gateway_url, data=base_params, timeout=15)
response.raise_for_status()
# 解析响应
result = response.json()
# 处理错误
error_response = result.get("error_response")
if error_response:
logging.error(f"API调用错误: {error_response.get('msg')} (错误码: {error_response.get('code')})")
return None
# 返回结果
return result.get(list(result.keys())[0])
except requests.exceptions.RequestException as e:
logging.error(f"API请求异常: {str(e)}")
return None
except json.JSONDecodeError:
logging.error(f"API响应解析失败: {response.text[:200]}...")
return None
# ------------------------------
# 商品相关自定义操作
# ------------------------------
def batch_get_items(self, num_iids: List[str], fields: str = "num_iid,title,price,stock,sales") -> List[Dict]:
"""
批量获取商品信息
:param num_iids: 商品ID列表
:param fields: 需要获取的字段
:return: 商品信息列表
"""
if not num_iids:
return []
items = []
# 淘宝API一次最多支持40个ID
batch_size = 40
for i in range(0, len(num_iids), batch_size):
batch_iids = num_iids[i:i+batch_size]
params = {
"fields": fields,
"num_iids": ",".join(batch_iids)
}
result = self.call_api("taobao.items.get", params)
if result and "items" in result and "item" in result["items"]:
items.extend(result["items"]["item"])
# 控制调用频率
time.sleep(0.5)
logging.info(f"成功获取 {len(items)} 件商品信息")
return items
def search_items_by_category(self, cid: int, page: int = 1, page_size: int = 40) -> Dict:
"""
按分类搜索商品
:param cid: 分类ID
:param page: 页码
:param page_size: 每页数量
:return: 商品列表及分页信息
"""
params = {
"cid": cid,
"page_no": page,
"page_size": page_size,
"fields": "num_iid,title,price,stock,sales,post_fee"
}
result = self.call_api("taobao.items.search", params)
if not result:
return {"total": 0, "items": []}
total = int(result.get("total_results", 0))
items = result.get("items", {}).get("item", [])
return {
"total": total,
"page": page,
"page_size": page_size,
"total_pages": (total + page_size - 1) // page_size,
"items": items
}
def get_category_item_stats(self, cid: int, max_pages: int = 5) -> Dict:
"""
获取分类下商品的统计信息(自定义聚合分析)
:param cid: 分类ID
:param max_pages: 最大页数
:return: 统计分析结果
"""
all_items = []
page = 1
while page <= max_pages:
logging.info(f"获取分类 {cid} 第 {page} 页商品")
result = self.search_items_by_category(cid, page)
if not result["items"]:
break
all_items.extend(result["items"])
if page >= result["total_pages"]:
break
page += 1
# 分析价格分布
prices = [float(item["price"]) for item in all_items]
price_stats = {}
if prices:
price_stats = {
"min": min(prices),
"max": max(prices),
"avg": sum(prices) / len(prices),
"median": self._calculate_median(prices)
}
# 分析销量分布
sales = [int(item.get("sales", 0)) for item in all_items]
sales_stats = {}
if sales:
sales_stats = {
"total": sum(sales),
"avg": sum(sales) / len(sales),
"top3": sorted(sales, reverse=True)[:3]
}
return {
"total_items": len(all_items),
"price_stats": price_stats,
"sales_stats": sales_stats,
"items": all_items
}
# ------------------------------
# 订单相关自定义操作
# ------------------------------
def get_recent_orders(self, days: int = 7, status: str = "") -> List[Dict]:
"""
获取最近N天的订单
:param days: 天数
:param status: 订单状态筛选
:return: 订单列表
"""
end_time = datetime.now()
start_time = end_time - timedelta(days=days)
params = {
"start_created": start_time.strftime("%Y-%m-%d %H:%M:%S"),
"end_created": end_time.strftime("%Y-%m-%d %H:%M:%S"),
"page_no": 1,
"page_size": 100,
"fields": "tid,title,price,num,payment,status,created,updated,buyer_nick"
}
if status:
params["status"] = status
all_orders = []
while True:
logging.info(f"获取订单第 {params['page_no']} 页")
result = self.call_api("taobao.trades.search", params)
if not result or "trades" not in result or "trade" not in result["trades"]:
break
trades = result["trades"]["trade"]
all_orders.extend(trades)
# 检查是否还有更多页
total_results = int(result.get("total_results", 0))
if len(all_orders) >= total_results:
break
params["page_no"] += 1
time.sleep(0.5)
logging.info(f"成功获取 {len(all_orders)} 条订单")
return all_orders
def analyze_order_status(self, orders: List[Dict]) -> Dict:
"""
分析订单状态分布(自定义分析)
:param orders: 订单列表
:return: 分析结果
"""
if not orders:
return {}
# 按状态统计
status_counts = defaultdict(int)
# 按日期统计
date_counts = defaultdict(int)
# 销售额统计
daily_sales = defaultdict(float)
for order in orders:
status = order["status"]
status_counts[status] += 1
# 处理日期
created = datetime.strptime(order["created"], "%Y-%m-%d %H:%M:%S")
date_str = created.strftime("%Y-%m-%d")
date_counts[date_str] += 1
# 处理销售额
daily_sales[date_str] += float(order["payment"])
# 转换为排序后的列表
sorted_dates = sorted(date_counts.keys())
sorted_sales = [daily_sales[date] for date in sorted_dates]
return {
"total_orders": len(orders),
"status_distribution": dict(status_counts),
"daily_distribution": {
"dates": sorted_dates,
"counts": [date_counts[date] for date in sorted_dates],
"sales": sorted_sales
},
"total_sales": sum(sorted_sales),
"avg_order_value": sum(sorted_sales) / len(orders) if orders else 0
}
def find_abnormal_orders(self, orders: List[Dict]) -> List[Dict]:
"""
查找异常订单(自定义业务逻辑)
:param orders: 订单列表
:return: 异常订单列表
"""
abnormal_orders = []
for order in orders:
# 异常条件1:价格为0或远低于正常价格
price = float(order["price"])
if price <= 0.01:
abnormal_orders.append({
"tid": order["tid"],
"reason": "价格异常",
"details": f"订单价格为 {price} 元",
"order": order
})
continue
# 异常条件2:订单状态异常(如长时间未付款)
if order["status"] == "WAIT_BUYER_PAY":
created_time = datetime.strptime(order["created"], "%Y-%m-%d %H:%M:%S")
if (datetime.now() - created_time) > timedelta(hours=48):
abnormal_orders.append({
"tid": order["tid"],
"reason": "长时间未付款",
"details": f"已等待 {(datetime.now() - created_time).total_seconds()/3600:.1f} 小时",
"order": order
})
logging.info(f"发现 {len(abnormal_orders)} 个异常订单")
return abnormal_orders
# ------------------------------
# 评价相关自定义操作
# ------------------------------
def get_product_comments(self, num_iid: str, page: int = 1, page_size: int = 20) -> Dict:
"""获取商品评价"""
params = {
"num_iid": num_iid,
"page": page,
"page_size": page_size
}
result = self.call_api("taobao.traderates.get", params)
if not result:
return {"total": 0, "comments": []}
total = int(result.get("total", 0))
comments = result.get("traderates", {}).get("traderate", [])
return {
"total": total,
"page": page,
"page_size": page_size,
"comments": comments
}
def analyze_negative_comments(self, num_iid: str, max_pages: int = 3) -> Dict:
"""分析商品负面评价(自定义分析)"""
all_comments = []
page = 1
while page <= max_pages:
logging.info(f"获取商品 {num_iid} 第 {page} 页评价")
result = self.get_product_comments(num_iid, page)
if not result["comments"]:
break
all_comments.extend(result["comments"])
if page >= (result["total"] + result["page_size"] - 1) // result["page_size"]:
break
page += 1
# 筛选负面评价(评分<=3星)
negative_comments = [
c for c in all_comments
if c.get("result", 0) <= 3 and c.get("content")
]
# 关键词分析
keywords = {
"质量": ["质量", "差", "不好", "坏", "劣质"],
"物流": ["慢", "快递", "物流", "包装", "破损"],
"尺寸": ["大", "小", "尺寸", "不合适", "短"],
"服务": ["态度", "客服", "服务", "不理", "差"],
"价格": ["贵", "不值", "性价比"]
}
keyword_counts = defaultdict(int)
for comment in negative_comments:
content = comment["content"].lower()
for category, kws in keywords.items():
for kw in kws:
if kw in content:
keyword_counts[category] += 1
break
return {
"total_comments": len(all_comments),
"negative_count": len(negative_comments),
"negative_ratio": len(negative_comments) / len(all_comments) if all_comments else 0,
"keyword_analysis": dict(keyword_counts),
"top_negative_comments": sorted(
negative_comments,
key=lambda x: len(x.get("content", "")),
reverse=True
)[:5]
}
# ------------------------------
# 工具方法
# ------------------------------
def _calculate_median(self, data: List[float]) -> float:
"""计算中位数"""
sorted_data = sorted(data)
n = len(sorted_data)
if n % 2 == 1:
return round(sorted_data[n//2], 2)
else:
return round((sorted_data[n//2 - 1] + sorted_data[n//2]) / 2, 2)
def visualize_analysis(self, analysis: Dict, title: str, output_file: str) -> None:
"""可视化分析结果"""
plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"]
plt.rcParams["axes.unicode_minus"] = False
fig, ax = plt.subplots(figsize=(10, 6))
if "status_distribution" in analysis:
# 订单状态分布饼图
labels = list(analysis["status_distribution"].keys())
sizes = list(analysis["status_distribution"].values())
ax.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=90)
ax.set_title(f"{title} - 订单状态分布")
ax.axis('equal')
elif "keyword_analysis" in analysis:
# 负面评价关键词条形图
keywords = list(analysis["keyword_analysis"].keys())
counts = list(analysis["keyword_analysis"].values())
ax.bar(keywords, counts, color='salmon')
ax.set_title(f"{title} - 负面评价关键词分布")
ax.set_xlabel("关键词类别")
ax.set_ylabel("出现次数")
elif "price_stats" in analysis and "sales_stats" in analysis:
# 价格分布直方图
prices = [float(item["price"]) for item in analysis["items"]]
ax.hist(prices, bins=10, alpha=0.7, color='skyblue')
ax.axvline(analysis["price_stats"]["avg"], color='r', linestyle='dashed', linewidth=1,
label=f'平均值: {analysis["price_stats"]["avg"]:.2f}')
ax.set_title(f"{title} - 商品价格分布")
ax.set_xlabel("价格 (元)")
ax.set_ylabel("商品数量")
ax.legend()
plt.tight_layout()
plt.savefig(output_file)
plt.close()
logging.info(f"分析图表已保存至 {output_file}")
def export_to_excel(self, data: List[Dict], filename: str, sheet_name: str = "数据") -> None:
"""将数据导出到Excel"""
if not data:
logging.warning("没有可导出的数据")
return
df = pd.DataFrame(data)
df.to_excel(filename, sheet_name=sheet_name, index=False)
logging.info(f"数据已导出至 {filename}")
# 示例调用
if __name__ == "__main__":
# 替换为实际的参数(从淘宝开放平台获取)
APPKEY = "your_appkey"
APPSECRET = "your_appsecret"
ACCESS_TOKEN = "your_access_token"
# 初始化API客户端
api = TaobaoCustomAPI(APPKEY, APPSECRET, ACCESS_TOKEN)
# 示例1: 分析分类商品
print("=== 分类商品分析 ===")
CATEGORY_ID = 50010550 # 示例分类ID(女装)
category_stats = api.get_category_item_stats(CATEGORY_ID, max_pages=3)
print(f"分类 {CATEGORY_ID} 共获取 {category_stats['total_items']} 件商品")
if category_stats["price_stats"]:
print(f"价格范围: {category_stats['price_stats']['min']}-{category_stats['price_stats']['max']} 元")
print(f"平均价格: {category_stats['price_stats']['avg']:.2f} 元")
if category_stats["sales_stats"]:
print(f"总销量: {category_stats['sales_stats']['total']} 件")
print(f"平均销量: {category_stats['sales_stats']['avg']:.1f} 件")
api.visualize_analysis(category_stats, f"分类 {CATEGORY_ID} 商品分析", "category_analysis.png")
# 示例2: 订单分析
print("\n=== 订单分析 ===")
recent_orders = api.get_recent_orders(days=7) # 获取最近7天订单
order_analysis = api.analyze_order_status(recent_orders)
print(f"总订单数: {order_analysis['total_orders']}")
print(f"总销售额: {order_analysis['total_sales']:.2f} 元")
print(f"平均订单金额: {order_analysis['avg_order_value']:.2f} 元")
print("订单状态分布:")
for status, count in order_analysis["status_distribution"].items():
print(f" {status}: {count} 单 ({count/order_analysis['total_orders']*100:.1f}%)")
# 查找异常订单
abnormal_orders = api.find_abnormal_orders(recent_orders)
if abnormal_orders:
print(f"\n发现 {len(abnormal_orders)} 个异常订单:")
for i, order in enumerate(abnormal_orders[:3], 1):
print(f" {i}. 订单ID: {order['tid']}, 原因: {order['reason']}")
api.visualize_analysis(order_analysis, "最近7天订单分析", "order_analysis.png")
api.export_to_excel(recent_orders, "recent_orders.xlsx", "订单数据")
# 示例3: 商品评价分析
print("\n=== 商品评价分析 ===")
ITEM_ID = "61234567890" # 示例商品ID
comment_analysis = api.analyze_negative_comments(ITEM_ID)
print(f"商品 {ITEM_ID} 共获取 {comment_analysis['total_comments']} 条评价")
print(f"负面评价: {comment_analysis['negative_count']} 条 "
f"({comment_analysis['negative_ratio']*100:.1f}%)")
if comment_analysis["keyword_analysis"]:
print("负面评价关键词:")
for kw, count in sorted(comment_analysis["keyword_analysis"].items(), key=lambda x: x[1], reverse=True):
print(f" {kw}: {count} 次")
api.visualize_analysis(comment_analysis, f"商品 {ITEM_ID} 评价分析", "comment_analysis.png")