京东开放开放平台(JD Open Platform)提供了丰富的 API 接口,支持开发者、订单、库存、支付等全链路电商业务场景。自定义 API 操作是基于这些基础接口,通过组合调用和二次开发,满足特定业务需求的实现方式,广泛应用于店铺管理、数据分析、竞品监控等场景。
一、京东自定义 API 核心特性分析
1. 平台架构与认证机制
2. 核心 API 分类与功能
3. 自定义 API 操作模式
二、Python 实现方案
import requests
import time
import hashlib
import json
import logging
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from datetime import datetime, timedelta
from collections import defaultdict
from typing import Dict, List, Optional, Tuple
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
class JDCustomAPI:
"""京东自定义 API 操作类,封装基础 API 调用和业务逻辑"""
def __init__(self, appkey: str, appsecret: str, access_token: str = ""):
"""
初始化京东 API 客户端
:param appkey: 应用的 appkey
:param appsecret: 应用的 appsecret
:param access_token: 访问令牌
"""
self.appkey = appkey
self.appsecret = appsecret
self.access_token = access_token
self.gateway_url = "https://api.jd.com/routerjson"
self.session = requests.Session()
self.session.headers.update({
"Content-Type": "application/x-www-form-urlencoded;charset=utf-8",
"User-Agent": "JDCustomAPI/1.0.0 (Python)"
})
# 频率控制
self.api_calls = defaultdict(int)
self.rate_limit = 60 # 每分钟最多调用次数
self.last_minute = time.time()
def set_access_token(self, access_token: str) -> None:
"""设置访问令牌"""
self.access_token = access_token
def _generate_sign(self, params: Dict) -> str:
"""生成签名"""
# 排序参数
sorted_params = sorted(params.items(), key=lambda x: x[0])
# 拼接签名字符串
sign_str = self.appsecret
for k, v in sorted_params:
sign_str += f"{k}{v}"
sign_str += self.appsecret
# 计算 MD5
return hashlib.md5(sign_str.encode('utf-8')).hexdigest().upper()
def _check_rate_limit(self) -> bool:
"""检查 API 调用频率限制"""
current_time = time.time()
# 每分钟重置计数
if current_time - self.last_minute > 60:
self.api_calls.clear()
self.last_minute = current_time
# 检查是否超过限制
if sum(self.api_calls.values()) >= self.rate_limit:
sleep_time = 60 - (current_time - self.last_minute)
logging.warning(f"API 调用频率超限,等待 {sleep_time:.1f} 秒")
time.sleep(sleep_time + 1)
self.api_calls.clear()
self.last_minute = time.time()
return True
def call_api(self, method: str, params: Dict = None) -> Optional[Dict]:
"""
调用京东 API
:param method: API 方法名,如 jingdong.ware.get
:param params: API 参数
:return: API 返回结果
"""
if not self.access_token:
logging.error("请先设置 access_token")
return None
# 检查频率限制
if not self._check_rate_limit():
return None
# 构建基础参数
base_params = {
"app_key": self.appkey,
"access_token": self.access_token,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"format": "json",
"v": "2.0",
"method": method
}
# 合并参数
if params:
base_params.update(params)
# 生成签名
sign = self._generate_sign(base_params)
base_params["sign"] = sign
try:
# 发送请求
response = self.session.post(self.gateway_url, data=base_params, timeout=15)
response.raise_for_status()
# 解析响应
result = response.json()
# 处理错误
if "error_response" in result:
error = result["error_response"]
logging.error(f"API 调用错误: {error.get('msg')} (错误码: {error.get('code')})")
return None
# 返回结果
return result.get(f"{method.replace('.', '_')}_response")
except requests.exceptions.RequestException as e:
logging.error(f"API 请求异常: {str(e)}")
return None
except json.JSONDecodeError:
logging.error(f"API 响应解析失败: {response.text[:200]}...")
return None
finally:
# 记录 API 调用
self.api_calls[method] += 1
# ------------------------------
# 商品相关自定义操作
# ------------------------------
def search_products(self, keyword: str, page: int = 1, page_size: int = 30,
min_price: float = None, max_price: float = None) -> Dict:
"""
搜索商品
:param keyword: 搜索关键词
:param page: 页码
:param page_size: 每页数量
:param min_price: 最低价格
:param max_price: 最高价格
:return: 商品列表及分页信息
"""
params = {
"keyword": keyword,
"page": page,
"page_size": page_size
}
if min_price is not None:
params["price_from"] = min_price
if max_price is not None:
params["price_to"] = max_price
result = self.call_api("jingdong.ware.search", params)
if not result:
return {"total": 0, "products": []}
total = int(result.get("total_count", 0))
products = result.get("wares", {}).get("ware", [])
return {
"total": total,
"page": page,
"page_size": page_size,
"total_pages": (total + page_size - 1) // page_size,
"products": self._format_products(products)
}
def get_product_details(self, ware_id: str) -> Optional[Dict]:
"""获取商品详情"""
params = {
"ware_id": ware_id,
"fields": "ware_id,ware_name,price,market_price,stock_num,brand_name,"
"cat_id,cat_name,seller_id,seller_name,created,modified"
}
result = self.call_api("jingdong.ware.get", params)
if not result:
return None
return self._format_product(result.get("ware", {}))
def batch_get_products(self, ware_ids: List[str]) -> List[Dict]:
"""批量获取商品信息"""
if not ware_ids:
return []
products = []
# 京东 API 一次最多支持 20 个 ID
batch_size = 20
for i in range(0, len(ware_ids), batch_size):
batch_ids = ware_ids[i:i+batch_size]
params = {
"ware_ids": ",".join(batch_ids),
"fields": "ware_id,ware_name,price,market_price,stock_num,seller_id,seller_name"
}
result = self.call_api("jingdong.ware.getBatch", params)
if result and "wares" in result:
formatted = [self._format_product(ware) for ware in result["wares"]["ware"]]
products.extend(formatted)
# 控制调用频率
time.sleep(1)
logging.info(f"成功获取 {len(products)} 件商品信息")
return products
def analyze_product_prices(self, keyword: str, max_pages: int = 3) -> Dict:
"""分析特定关键词商品的价格分布"""
all_products = []
page = 1
while page <= max_pages:
logging.info(f"搜索商品第 {page} 页: {keyword}")
result = self.search_products(keyword, page)
if not result["products"]:
break
all_products.extend(result["products"])
if page >= result["total_pages"]:
break
page += 1
time.sleep(1)
# 提取价格信息
prices = [float(p["price"]) for p in all_products]
# 价格统计
price_stats = {}
if prices:
price_stats = {
"min": min(prices),
"max": max(prices),
"avg": sum(prices) / len(prices),
"median": self._calculate_median(prices)
}
# 折扣分析
discount_stats = {}
discounted_products = [p for p in all_products if p["market_price"] > p["price"]]
if discounted_products:
discounts = [(1 - float(p["price"]) / float(p["market_price"])) * 100
for p in discounted_products]
discount_stats = {
"count": len(discounted_products),
"ratio": len(discounted_products) / len(all_products) * 100,
"avg_discount": sum(discounts) / len(discounts)
}
# 品牌分析
brand_counts = defaultdict(int)
for product in all_products:
brand = product.get("brand_name", "未知品牌")
brand_counts[brand] += 1
return {
"total_products": len(all_products),
"price_stats": price_stats,
"discount_stats": discount_stats,
"top_brands": sorted(brand_counts.items(), key=lambda x: x[1], reverse=True)[:5],
"products": all_products
}
# ------------------------------
# 订单相关自定义操作
# ------------------------------
def get_recent_orders(self, days: int = 7, order_state: int = 0) -> List[Dict]:
"""
获取最近 N 天的订单
:param days: 天数
:param order_state: 订单状态,0-全部,1-待付款,2-待发货等
:return: 订单列表
"""
end_time = datetime.now()
start_time = end_time - timedelta(days=days)
params = {
"start_date": start_time.strftime("%Y-%m-%d %H:%M:%S"),
"end_date": end_time.strftime("%Y-%m-%d %H:%M:%S"),
"page": 1,
"page_size": 50,
"order_state": order_state,
"fields": "order_id,order_total_price,order_state,payment_time,ship_time,"
"receive_time,sku_id,sku_name,sku_num,product_price,buyer_nick"
}
all_orders = []
while True:
logging.info(f"获取订单第 {params['page']} 页")
result = self.call_api("jingdong.order.search", params)
if not result or "orders" not in result or "order" not in result["orders"]:
break
orders = result["orders"]["order"]
all_orders.extend(orders)
# 检查是否还有更多页
total_count = int(result.get("total_count", 0))
if len(all_orders) >= total_count:
break
params["page"] += 1
time.sleep(1)
logging.info(f"成功获取 {len(all_orders)} 条订单")
return self._format_orders(all_orders)
def analyze_order_data(self, orders: List[Dict]) -> Dict:
"""分析订单数据"""
if not orders:
return {}
# 订单状态分布
state_counts = defaultdict(int)
# 每日订单统计
daily_stats = defaultdict(lambda: {"count": 0, "amount": 0})
# 商品销售统计
product_stats = defaultdict(lambda: {"count": 0, "amount": 0})
for order in orders:
# 订单状态统计
state_counts[order["order_state_desc"]] += 1
# 每日统计
if order["payment_time"]:
date_str = order["payment_time"].split()[0]
daily_stats[date_str]["count"] += 1
daily_stats[date_str]["amount"] += float(order["order_total_price"])
# 商品销售统计
for item in order.get("items", []):
product_id = item["sku_id"]
product_stats[product_id]["count"] += item["sku_num"]
product_stats[product_id]["amount"] += float(item["product_price"]) * item["sku_num"]
# 计算总销售额和订单数
total_orders = len(orders)
total_sales = sum(float(order["order_total_price"]) for order in orders)
# 转换为排序后的列表
sorted_dates = sorted(daily_stats.keys())
daily_sales = [daily_stats[date]["amount"] for date in sorted_dates]
daily_counts = [daily_stats[date]["count"] for date in sorted_dates]
# 热销商品
top_products = sorted(product_stats.items(),
key=lambda x: x[1]["amount"],
reverse=True)[:10]
return {
"total_orders": total_orders,
"total_sales": total_sales,
"avg_order_value": total_sales / total_orders if total_orders else 0,
"state_distribution": dict(state_counts),
"daily_trend": {
"dates": sorted_dates,
"sales": daily_sales,
"orders": daily_counts
},
"top_products": top_products
}
# ------------------------------
# 库存相关自定义操作
# ------------------------------
def get_product_stock(self, ware_id: str) -> Optional[Dict]:
"""获取商品库存"""
params = {
"ware_id": ware_id
}
result = self.call_api("jingdong.stock.get", params)
if not result:
return None
return {
"ware_id": ware_id,
"stock_num": result.get("stock_num", 0),
"freeze_num": result.get("freeze_num", 0), # 冻结库存
"available_num": result.get("stock_num", 0) - result.get("freeze_num", 0), # 可用库存
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
def check_low_stock(self, ware_ids: List[str], threshold: int = 10) -> List[Dict]:
"""检查低库存商品"""
low_stock_items = []
for ware_id in ware_ids:
stock = self.get_product_stock(ware_id)
if not stock:
continue
if stock["available_num"] <= threshold:
# 获取商品基本信息
product = self.get_product_details(ware_id)
if product:
stock["product_name"] = product["ware_name"]
stock["price"] = product["price"]
low_stock_items.append(stock)
time.sleep(0.5)
logging.info(f"发现 {len(low_stock_items)} 个低库存商品")
return low_stock_items
# ------------------------------
# 工具方法
# ------------------------------
def _format_products(self, products: List[Dict]) -> List[Dict]:
"""格式化商品列表数据"""
return [self._format_product(p) for p in products]
def _format_product(self, product: Dict) -> Dict:
"""格式化单个商品数据"""
return {
"ware_id": product.get("ware_id"),
"ware_name": product.get("ware_name", ""),
"price": product.get("price", 0),
"market_price": product.get("market_price", 0),
"stock_num": product.get("stock_num", 0),
"brand_name": product.get("brand_name", ""),
"cat_id": product.get("cat_id", ""),
"cat_name": product.get("cat_name", ""),
"seller_id": product.get("seller_id", ""),
"seller_name": product.get("seller_name", ""),
"created": product.get("created", ""),
"modified": product.get("modified", ""),
"detail_url": f"https://item.jd.com/{product.get('ware_id')}.html"
}
def _format_orders(self, orders: List[Dict]) -> List[Dict]:
"""格式化订单数据"""
formatted = []
for order in orders:
# 订单状态映射
state_map = {
0: "全部",
1: "待付款",
2: "待发货",
3: "已发货",
4: "已完成",
5: "已取消"
}
# 处理订单项
items = []
if "sku_id" in order: # 单个商品订单
items.append({
"sku_id": order["sku_id"],
"sku_name": order["sku_name"],
"sku_num": order.get("sku_num", 1),
"product_price": order.get("product_price", 0)
})
elif "skus" in order and "sku" in order["skus"]: # 多个商品订单
for sku in order["skus"]["sku"]:
items.append({
"sku_id": sku["sku_id"],
"sku_name": sku["sku_name"],
"sku_num": sku.get("sku_num", 1),
"product_price": sku.get("product_price", 0)
})
formatted.append({
"order_id": order.get("order_id"),
"order_total_price": order.get("order_total_price", 0),
"order_state": order.get("order_state", 0),
"order_state_desc": state_map.get(order.get("order_state", 0), "未知"),
"payment_time": order.get("payment_time", ""),
"ship_time": order.get("ship_time", ""),
"receive_time": order.get("receive_time", ""),
"buyer_nick": order.get("buyer_nick", ""),
"items": items
})
return formatted
def _calculate_median(self, data: List[float]) -> float:
"""计算中位数"""
sorted_data = sorted(data)
n = len(sorted_data)
if n % 2 == 1:
return round(sorted_data[n//2], 2)
else:
return round((sorted_data[n//2 - 1] + sorted_data[n//2]) / 2, 2)
def visualize_analysis(self, analysis: Dict, title: str, output_file: str) -> None:
"""可视化分析结果"""
plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"]
plt.rcParams["axes.unicode_minus"] = False
fig, ax = plt.subplots(figsize=(10, 6))
if "price_stats" in analysis:
# 价格分布直方图
prices = [float(p["price"]) for p in analysis["products"]]
ax.hist(prices, bins=10, alpha=0.7, color='skyblue')
ax.axvline(analysis["price_stats"]["avg"], color='r', linestyle='dashed', linewidth=1,
label=f'平均价格: {analysis["price_stats"]["avg"]:.2f}')
ax.set_title(f"{title} - 商品价格分布")
ax.set_xlabel("价格 (元)")
ax.set_ylabel("商品数量")
ax.legend()
elif "daily_trend" in analysis:
# 销售趋势图
x = np.arange(len(analysis["daily_trend"]["dates"]))
width = 0.35
ax.bar(x - width/2, analysis["daily_trend"]["orders"], width, label='订单数')
ax_twin = ax.twinx()
ax_twin.plot(x, analysis["daily_trend"]["sales"], 'r-', marker='o', label='销售额')
ax.set_title(f"{title} - 销售趋势")
ax.set_xlabel("日期")
ax.set_ylabel("订单数")
ax_twin.set_ylabel("销售额 (元)")
ax.set_xticks(x)
ax.set_xticklabels(analysis["daily_trend"]["dates"], rotation=45)
# 合并图例
lines, labels = ax.get_legend_handles_labels()
lines2, labels2 = ax_twin.get_legend_handles_labels()
ax.legend(lines + lines2, labels + labels2, loc='upper left')
elif "state_distribution" in analysis:
# 订单状态饼图
labels = list(analysis["state_distribution"].keys())
sizes = list(analysis["state_distribution"].values())
ax.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=90)
ax.set_title(f"{title} - 订单状态分布")
ax.axis('equal')
plt.tight_layout()
plt.savefig(output_file)
plt.close()
logging.info(f"分析图表已保存至 {output_file}")
def export_to_excel(self, data: List[Dict], filename: str, sheet_name: str = "数据") -> None:
"""将数据导出到 Excel"""
if not data:
logging.warning("没有可导出的数据")
return
df = pd.DataFrame(data)
df.to_excel(filename, sheet_name=sheet_name, index=False)
logging.info(f"数据已导出至 {filename}")
# 示例调用
if __name__ == "__main__":
# 替换为实际的参数(从京东开放平台获取)
APPKEY = "your_appkey"
APPSECRET = "your_appsecret"
ACCESS_TOKEN = "your_access_token"
# 初始化 API 客户端
api = JDCustomAPI(APPKEY, APPSECRET, ACCESS_TOKEN)
# 示例 1: 商品价格分析
print("=== 商品价格分析 ===")
KEYWORD = "智能手机" # 搜索关键词
price_analysis = api.analyze_product_prices(KEYWORD, max_pages=3)
print(f"搜索关键词: {KEYWORD}")
print(f"获取商品总数: {price_analysis['total_products']}")
if price_analysis["price_stats"]:
print(f"价格范围: {price_analysis['price_stats']['min']}-{price_analysis['price_stats']['max']} 元")
print(f"平均价格: {price_analysis['price_stats']['avg']:.2f} 元")
if price_analysis["discount_stats"]:
print(f"打折商品比例: {price_analysis['discount_stats']['ratio']:.1f}%")
print(f"平均折扣力度: {price_analysis['discount_stats']['avg_discount']:.1f}%")
print("热门品牌 TOP3:")
for i, (brand, count) in enumerate(price_analysis["top_brands"][:3], 1):
print(f" {i}. {brand}: {count} 款商品")
api.visualize_analysis(price_analysis, f"商品价格分析: {KEYWORD}", "product_price_analysis.png")
# 示例 2: 订单数据分析
print("\n=== 订单数据分析 ===")
recent_orders = api.get_recent_orders(days=15) # 获取最近 15 天订单
order_analysis = api.analyze_order_data(recent_orders)
print(f"总订单数: {order_analysis['total_orders']}")
print(f"总销售额: {order_analysis['total_sales']:.2f} 元")
print(f"平均订单金额: {order_analysis['avg_order_value']:.2f} 元")
print("订单状态分布:")
for state, count in order_analysis["state_distribution"].items():
print(f" {state}: {count} 单 ({count/order_analysis['total_orders']*100:.1f}%)")
api.visualize_analysis(order_analysis, "最近15天订单分析", "order_analysis.png")
api.export_to_excel(recent_orders, "recent_orders.xlsx", "订单数据")
# 示例 3: 库存检查
print("\n=== 库存检查 ===")
if price_analysis["products"]:
# 选取前 10 个商品检查库存
product_ids = [p["ware_id"] for p in price_analysis["products"][:10]]
low_stock = api.check_low_stock(product_ids, threshold=20)
if low_stock:
print(f"发现 {len(low_stock)} 个低库存商品:")
for item in low_stock:
print(f" {item['product_name'][:20]}... (ID: {item['ware_id']})")
print(f" 可用库存: {item['available_num']}, 价格: {item['price']} 元")
else:
print("未发现低库存商品")