京东作为国内领先的电商平台,其畅销榜数据反映了市场真实消费趋势和用户偏好。item_search_best接口作为获取京东畅销商品数据的核心工具,能够提供各品类下的热销商品排名、销量、价格波动等关键信息,为商家选品、市场策略制定、竞品分析提供重要数据支持。
一、item_search_best 接口核心特性分析
1. 接口定位与核心价值
2. 接口权限与调用限制
3. 核心参数解析
必选参数
可选参数
二、签名生成与返回数据结构
1. 签名生成逻辑
2. 返回数据结构解析
三、Python 实现方案
import requests
import time
import hashlib
import json
import logging
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from datetime import datetime, timedelta
from collections import defaultdict
from typing import Dict, List, Optional, Tuple
import re
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
# 配置中文显示
plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"]
plt.rcParams["axes.unicode_minus"] = False
class JDBestSellers:
"""京东畅销榜接口封装类,用于获取和分析畅销商品数据"""
def __init__(self, app_key: str, app_secret: str):
"""
初始化京东API客户端
:param app_key: 应用的app_key
:param app_secret: 应用的app_secret
"""
self.app_key = app_key
self.app_secret = app_secret
self.api_url = "https://api.jd.com/routerjson"
# 频率控制
self.rate_limit = 30 # 默认个人开发者限制,企业开发者可修改为100
self.call_timestamps = [] # 存储调用时间戳(毫秒级)
def set_rate_limit(self, limit: int) -> None:
"""设置调用频率限制(次/分钟)"""
if 30 <= limit <= 100:
self.rate_limit = limit
logging.info(f"已设置调用频率限制为 {limit} 次/分钟")
else:
logging.warning("频率限制必须在30-100之间,未修改")
def _generate_sign(self, params: Dict) -> str:
"""生成签名(MD5算法)"""
# 1. 按参数名ASCII升序排序
sorted_params = sorted(params.items(), key=lambda x: x[0])
# 2. 拼接为"key=value&key=value"格式
param_str = "&".join([f"{k}={v}" for k, v in sorted_params])
# 3. 加上app_secret
sign_str = f"{param_str}&app_secret={self.app_secret}"
# 4. MD5加密并转为大写
return hashlib.md5(sign_str.encode('utf-8')).hexdigest().upper()
def _check_rate_limit(self) -> None:
"""检查并控制调用频率"""
current_time = time.time() * 1000 # 转为毫秒
# 保留1分钟内的调用记录
self.call_timestamps = [t for t in self.call_timestamps if current_time - t < 60000]
# 若超过限制,计算需要等待的时间
if len(self.call_timestamps) >= self.rate_limit:
oldest_time = self.call_timestamps[0]
sleep_time = (60000 - (current_time - oldest_time)) / 1000 + 0.1 # 额外加0.1秒保险
logging.warning(f"调用频率超限,等待 {sleep_time:.1f} 秒")
time.sleep(sleep_time)
# 再次清理过期记录
self.call_timestamps = [t for t in self.call_timestamps if time.time()*1000 - t < 60000]
# 记录本次调用时间
self.call_timestamps.append(current_time)
def get_best_sellers(self, cat: str, rank_type: str = "realtime",
page: int = 1, page_size: int = 50,
filters: Optional[Dict] = None) -> Optional[Dict]:
"""
获取京东畅销榜数据
:param cat: 类目ID
:param rank_type: 榜单类型:realtime, week, month
:param page: 页码
:param page_size: 每页数量
:param filters: 筛选参数
:return: 畅销榜数据
"""
# 构建基础参数
base_params = {
"app_key": self.app_key,
"method": "jd.item.search.best",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"format": "json",
"v": "1.0",
"cat": cat,
"type": rank_type,
"page": page,
"page_size": page_size
}
# 合并筛选参数
if filters and isinstance(filters, Dict):
# 过滤空值参数
valid_filters = {k: v for k, v in filters.items() if v is not None}
base_params.update(valid_filters)
# 生成签名
sign = self._generate_sign(base_params)
base_params["sign"] = sign
# 检查频率限制
self._check_rate_limit()
try:
# 发送请求
response = requests.post(self.api_url, data=base_params, timeout=15)
response.raise_for_status()
# 解析响应
result = response.json()
# 处理错误
if "error_response" in result:
error = result["error_response"]
logging.error(f"API调用错误: {error.get('msg')} (错误码: {error.get('code')})")
return None
# 提取结果
best_sellers = result.get("jd_item_search_best_response", {}).get("result", {})
if not best_sellers:
logging.warning("未获取到畅销榜数据")
return None
logging.info(f"成功获取 {rank_type} 榜第 {page} 页数据,类目ID: {cat}")
return best_sellers
except requests.exceptions.RequestException as e:
logging.error(f"请求异常: {str(e)}")
return None
except json.JSONDecodeError:
logging.error(f"响应解析失败: {response.text[:200]}...")
return None
def batch_get_best_sellers(self, cat: str, rank_type: str = "realtime",
max_pages: int = 3, page_size: int = 50,
filters: Optional[Dict] = None) -> Tuple[List[Dict], Dict]:
"""
批量获取多页畅销榜数据
:param cat: 类目ID
:param rank_type: 榜单类型
:param max_pages: 最大页数
:param page_size: 每页数量
:param filters: 筛选参数
:return: 商品列表和元信息
"""
all_products = []
meta_info = {
"cat_id": cat,
"rank_type": rank_type,
"total_products": 0,
"total_pages": 0,
"filters": filters,
"fetch_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"update_time": ""
}
page = 1
while page <= max_pages:
logging.info(f"正在获取第 {page}/{max_pages} 页{rank_type}榜数据...")
result = self.get_best_sellers(cat, rank_type, page, page_size, filters)
if not result:
break
# 提取商品数据
products = result.get("products", {}).get("product", [])
if not products:
logging.info("当前页无商品数据,停止获取")
break
all_products.extend(products)
# 保存元信息(第一页)
if page == 1:
meta_info["total_products"] = result.get("total", 0)
meta_info["total_pages"] = min(max_pages, (meta_info["total_products"] + page_size - 1) // page_size)
meta_info["update_time"] = result.get("update_time", "")
meta_info["category_name"] = result.get("category_name", f"类目ID:{cat}")
page += 1
logging.info(f"批量获取完成,共获取 {len(all_products)} 件畅销商品数据")
return all_products, meta_info
def analyze_best_sellers(self, products: List[Dict]) -> Dict:
"""分析畅销商品数据"""
if not products:
return {"error": "没有商品数据可分析"}
# 1. 价格分析
prices = []
for product in products:
try:
price = float(product.get("price", 0))
prices.append(price)
except (ValueError, TypeError):
continue
price_stats = {}
if prices:
price_stats = {
"min": round(min(prices), 2),
"max": round(max(prices), 2),
"avg": round(sum(prices) / len(prices), 2),
"median": round(np.median(prices), 2)
}
# 2. 品牌分析
brand_counts = defaultdict(int)
for product in products:
brand = product.get("brand", "未知品牌")
brand_counts[brand] += 1
# 3. 销量分析
sales_data = []
for product in products:
try:
sales = int(product.get("sales", 0))
rank = int(product.get("rank", 0))
sales_data.append((rank, sales))
except (ValueError, TypeError):
continue
sales_by_rank = {}
if sales_data:
# 按排名区间统计平均销量
rank_groups = {
"1-10名": [], "11-30名": [], "31-50名": [],
"51-100名": [], "101-200名": [], "201名以上": []
}
for rank, sales in sales_data:
if rank <= 10:
rank_groups["1-10名"].append(sales)
elif rank <= 30:
rank_groups["11-30名"].append(sales)
elif rank <= 50:
rank_groups["31-50名"].append(sales)
elif rank <= 100:
rank_groups["51-100名"].append(sales)
elif rank <= 200:
rank_groups["101-200名"].append(sales)
else:
rank_groups["201名以上"].append(sales)
sales_by_rank = {
group: {
"count": len(sales_list),
"avg_sales": round(sum(sales_list)/len(sales_list), 1) if sales_list else 0
}
for group, sales_list in rank_groups.items() if sales_list
}
# 4. 好评率分析
rating_stats = defaultdict(int)
ratings = []
for product in products:
try:
rate = float(product.get("good_rate", 0)) * 100 # 转为百分比
ratings.append(rate)
if rate >= 98:
rating_stats["98%以上"] += 1
elif rate >= 95:
rating_stats["95-97%"] += 1
elif rate >= 90:
rating_stats["90-94%"] += 1
else:
rating_stats["90%以下"] += 1
except (ValueError, TypeError):
rating_stats["未知"] += 1
rating_avg = round(sum(ratings)/len(ratings), 1) if ratings else 0
# 5. 排名变化分析
rank_change = defaultdict(int)
for product in products:
try:
current_rank = int(product.get("rank", 0))
prev_rank = int(product.get("previous_rank", 0))
if prev_rank == 0: # 新上榜
rank_change["新上榜"] += 1
else:
diff = prev_rank - current_rank
if diff > 10:
rank_change["大幅上升(>10)"] += 1
elif diff > 0:
rank_change["小幅上升(1-10)"] += 1
elif diff == 0:
rank_change["持平"] += 1
elif diff > -10:
rank_change["小幅下降(1-10)"] += 1
else:
rank_change["大幅下降(>10)"] += 1
except (ValueError, TypeError):
rank_change["数据异常"] += 1
# 6. 店铺类型分析
shop_type = defaultdict(int)
for product in products:
shop_name = product.get("shop_name", "")
tags = product.get("tags", [])
if "自营" in tags or "京东自营" in shop_name:
shop_type["京东自营"] += 1
else:
shop_type["第三方店铺"] += 1
return {
"total_products": len(products),
"price_analysis": price_stats,
"brand_analysis": {
"distribution": dict(brand_counts),
"top_brands": sorted(brand_counts.items(), key=lambda x: x[1], reverse=True)[:5]
},
"sales_analysis": {
"by_rank": sales_by_rank,
"total_sales": sum(sales for _, sales in sales_data) if sales_data else 0
},
"rating_analysis": {
"distribution": dict(rating_stats),
"average_rating": rating_avg
},
"rank_change_analysis": dict(rank_change),
"shop_type_analysis": dict(shop_type)
}
def get_top_products(self, products: List[Dict], by: str = "sales", top_n: int = 10) -> List[Dict]:
"""
获取特定维度排名靠前的商品
:param products: 商品列表
:param by: 排序维度:sales(销量), growth(增长率), rating(好评率)
:param top_n: 取前N名
:return: 排序后的商品列表
"""
if not products:
return []
# 准备排序数据
ranked_products = []
for product in products:
try:
if by == "sales":
value = int(product.get("sales", 0))
elif by == "growth":
value = float(product.get("sales_growth", 0))
elif by == "rating":
value = float(product.get("good_rate", 0))
else:
value = 0
ranked_products.append((product, value))
except (ValueError, TypeError):
continue
# 排序并返回前N名
ranked_products.sort(key=lambda x: x[1], reverse=True)
return [p[0] for p in ranked_products[:top_n]]
def visualize_analysis(self, analysis: Dict, category_name: str, output_dir: str = ".") -> None:
"""可视化分析结果"""
# 1. 价格分布直方图
if "price_analysis" in analysis and analysis["price_analysis"]:
plt.figure(figsize=(10, 6))
# 提取所有价格
prices = []
for product in analysis.get("raw_products", []):
try:
price = float(product.get("price", 0))
prices.append(price)
except (ValueError, TypeError):
continue
plt.hist(prices, bins=10, alpha=0.7, color='lightblue')
plt.axvline(analysis["price_analysis"]["avg"], color='red', linestyle='--',
label=f'平均价格: {analysis["price_analysis"]["avg"]}元')
plt.title(f'{category_name} 畅销商品价格分布')
plt.xlabel('价格(元)')
plt.ylabel('商品数量')
plt.legend()
plt.tight_layout()
plt.savefig(f"{output_dir}/price_distribution.png")
plt.close()
logging.info(f"价格分布图表已保存至 {output_dir}/price_distribution.png")
# 2. 品牌分布饼图(前5名+其他)
if "brand_analysis" in analysis and analysis["brand_analysis"]["distribution"]:
plt.figure(figsize=(10, 8))
brand_data = sorted(analysis["brand_analysis"]["distribution"].items(),
key=lambda x: x[1], reverse=True)
if len(brand_data) > 5:
top5 = brand_data[:5]
others = sum(count for _, count in brand_data[5:])
top5.append(("其他品牌", others))
brand_data = top5
labels, sizes = zip(*brand_data)
plt.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=90)
plt.title(f'{category_name} 畅销商品品牌分布')
plt.axis('equal')
plt.tight_layout()
plt.savefig(f"{output_dir}/brand_distribution.png")
plt.close()
logging.info(f"品牌分布图表已保存至 {output_dir}/brand_distribution.png")
# 3. 销量与排名关系图
if "sales_analysis" in analysis and analysis["sales_analysis"]["by_rank"]:
plt.figure(figsize=(10, 6))
rank_groups = sorted(analysis["sales_analysis"]["by_rank"].keys(),
key=lambda x: int(re.findall(r'\d+', x)[0]))
avg_sales = [analysis["sales_analysis"]["by_rank"][g]["avg_sales"] for g in rank_groups]
plt.bar(rank_groups, avg_sales, color='orange')
plt.title(f'{category_name} 不同排名区间的平均销量')
plt.xlabel('排名区间')
plt.ylabel('平均销量')
for i, v in enumerate(avg_sales):
plt.text(i, v + 5, f'{v}', ha='center')
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig(f"{output_dir}/sales_by_rank.png")
plt.close()
logging.info(f"销量与排名关系图表已保存至 {output_dir}/sales_by_rank.png")
# 4. 好评率分布条形图
if "rating_analysis" in analysis and analysis["rating_analysis"]["distribution"]:
plt.figure(figsize=(10, 6))
ratings = list(analysis["rating_analysis"]["distribution"].keys())
counts = list(analysis["rating_analysis"]["distribution"].values())
plt.bar(ratings, counts, color='lightgreen')
plt.title(f'{category_name} 畅销商品好评率分布')
plt.xlabel('好评率区间')
plt.ylabel('商品数量')
plt.axhline(y=sum(counts)/len(counts), color='r', linestyle='--',
label=f'平均分布: {sum(counts)/len(counts):.1f}')
for i, v in enumerate(counts):
plt.text(i, v + 0.5, str(v), ha='center')
plt.legend()
plt.tight_layout()
plt.savefig(f"{output_dir}/rating_distribution.png")
plt.close()
logging.info(f"好评率分布图表已保存至 {output_dir}/rating_distribution.png")
def export_to_excel(self, products: List[Dict], analysis: Dict, meta_info: Dict, filename: str) -> None:
"""导出畅销榜数据和分析结果到Excel"""
if not products and not analysis:
logging.warning("没有数据可导出")
return
try:
with pd.ExcelWriter(filename) as writer:
# 元信息
pd.DataFrame([meta_info]).to_excel(writer, sheet_name='榜单信息', index=False)
# 商品数据
if products:
filtered_products = []
for product in products:
# 提取关键信息
filtered = {
"排名": product.get("rank"),
"上期排名": product.get("previous_rank"),
"排名变化": "新上榜" if product.get("previous_rank", 0) == 0
else (product.get("previous_rank", 0) - product.get("rank", 0)),
"商品ID": product.get("item_id"),
"商品名称": product.get("title"),
"品牌": product.get("brand"),
"价格(元)": product.get("price"),
"原价(元)": product.get("original_price"),
"销量": product.get("sales"),
"销量增长率(%)": product.get("sales_growth"),
"好评率(%)": round(float(product.get("good_rate", 0)) * 100, 1)
if product.get("good_rate") else None,
"评论数": product.get("comment_count"),
"店铺名称": product.get("shop_name"),
"店铺类型": "京东自营" if ("自营" in product.get("tags", []) or
"京东自营" in product.get("shop_name", ""))
else "第三方店铺",
"标签": ",".join(product.get("tags", []))
}
filtered_products.append(filtered)
df_products = pd.DataFrame(filtered_products)
df_products.to_excel(writer, sheet_name='畅销商品数据', index=False)
# 分析结果
if analysis and "error" not in analysis:
# 品牌分析
if "brand_analysis" in analysis:
df_brand = pd.DataFrame(list(analysis["brand_analysis"]["distribution"].items()),
columns=['品牌', '数量'])
df_brand.to_excel(writer, sheet_name='品牌分析', index=False)
# 价格分析
if "price_analysis" in analysis and analysis["price_analysis"]:
df_price = pd.DataFrame([analysis["price_analysis"]], index=["数值"])
df_price.to_excel(writer, sheet_name='价格分析')
logging.info(f"数据已导出至 {filename}")
except Exception as e:
logging.error(f"导出Excel失败: {e}")
# 示例调用
if __name__ == "__main__":
# 替换为实际的参数(从京东开放平台获取)
APP_KEY = "your_app_key"
APP_SECRET = "your_app_secret"
CATEGORY_ID = "9987" # 手机类目示例,可根据需要替换
# 初始化API客户端
jd_best = JDBestSellers(APP_KEY, APP_SECRET)
# 若为企业开发者,设置更高的频率限制
# jd_best.set_rate_limit(100)
# 1. 设置筛选条件
filters = {
"brand": "苹果,华为", # 筛选苹果和华为品牌
"price_from": 3000, # 最低价格3000元
"price_to": 8000, # 最高价格8000元
"sort": "sale" # 按销量排序
}
# 2. 批量获取畅销榜数据
print("=== 获取畅销榜数据 ===")
products, meta_info = jd_best.batch_get_best_sellers(
cat=CATEGORY_ID,
rank_type="week", # 获取周榜数据
max_pages=2, # 获取前2页
page_size=50,
filters=filters
)
if products:
print(f"类目名称: {meta_info['category_name']}")
print(f"榜单类型: {meta_info['rank_type']}榜")
print(f"获取商品数量: {len(products)}")
print(f"总商品数量: {meta_info['total_products']}")
print(f"数据最后更新时间: {meta_info['update_time']}")
# 3. 分析畅销榜数据
print("\n=== 畅销榜数据分析 ===")
if products:
analysis = jd_best.analyze_best_sellers(products)
# 保存原始数据用于可视化
analysis["raw_products"] = products
print(f"价格范围: {analysis['price_analysis']['min']}-{analysis['price_analysis']['max']}元")
print(f"平均价格: {analysis['price_analysis']['avg']}元")
print(f"平均好评率: {analysis['rating_analysis']['average_rating']}%")
print("\nTOP5品牌:")
for brand, count in analysis["brand_analysis"]["top_brands"]:
print(f" {brand}: {count}款商品")
print("\n店铺类型分布:")
for type_name, count in analysis["shop_type_analysis"].items():
print(f" {type_name}: {count}款商品")
print("\n排名变化分析:")
for change, count in analysis["rank_change_analysis"].items():
print(f" {change}: {count}款商品")
# 4. 获取各维度TOP商品
print("\n=== 各维度TOP3商品 ===")
top_sales = jd_best.get_top_products(products, by="sales", top_n=3)
print("销量TOP3:")
for i, product in enumerate(top_sales, 1):
print(f"{i}. {product['title'][:20]}... 销量: {product['sales']}")
top_growth = jd_best.get_top_products(products, by="growth", top_n=3)
print("\n增长最快TOP3:")
for i, product in enumerate(top_growth, 1):
print(f"{i}. {product['title'][:20]}... 增长率: {product['sales_growth']}%")
# 5. 可视化分析结果
jd_best.visualize_analysis(analysis, meta_info["category_name"])
# 6. 导出数据到Excel
jd_best.export_to_excel(products, analysis, meta_info, "京东畅销榜分析.xlsx")
else:
print("未获取到畅销榜数据,无法进行分析")