item_search_shop 接口是获取特定店铺所有商品数据的核心接口,能够批量获取店铺内的商品列表、基础信息、价格、销量等关键数据。对于店铺分析、竞品监控、商品归类研究等场景具有重要价值,尤其适合需要全面了解某店铺经营状况的业务需求。
一、接口核心特性分析
1. 接口功能与定位
2. 认证机制
3. 核心参数与响应结构
请求参数
响应核心字段
二、Python 脚本实现
import requests
import time
import json
import logging
import hashlib
import re
from typing import Dict, Optional, List, Tuple
from requests.exceptions import RequestException
import matplotlib.pyplot as plt
import pandas as pd
from collections import defaultdict
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
class ShopItemSearchAPI:
def __init__(self, appkey: str, appsecret: str, platform: str = "general"):
"""
初始化店铺商品搜索API客户端
:param appkey: 开放平台appkey
:param appsecret: 开放平台appsecret
:param platform: 平台标识,用于适配不同平台的接口差异
"""
self.appkey = appkey
self.appsecret = appsecret
self.platform = platform
self.base_url = self._get_base_url()
self.session = requests.Session()
self.session.headers.update({
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36"
})
def _get_base_url(self) -> str:
"""根据平台获取基础URL"""
# 可根据实际平台扩展
platform_urls = {
"general": "https://apiplatform.com/api",
"taobao": "https://eco.taobao.com/router/rest",
"jd": "https://api.jd.com/routerjson",
"1688": "https://gw.open.1688.com/openapi/gateway.do"
}
return platform_urls.get(self.platform, platform_urls["general"])
def _generate_sign(self, params: Dict) -> str:
"""生成请求签名,不同平台可能有差异"""
if self.platform == "taobao":
# 淘宝签名方式
sorted_params = sorted(params.items(), key=lambda x: x[0])
sign_str = ""
for k, v in sorted_params:
sign_str += f"{k}{v}"
sign_str += self.appsecret
return hashlib.md5(sign_str.encode()).hexdigest().upper()
elif self.platform == "jd":
# 京东签名方式
sorted_params = sorted(params.items(), key=lambda x: x[0])
sign_str = self.appsecret
for k, v in sorted_params:
sign_str += f"{k}{v}"
sign_str += self.appsecret
return hashlib.md5(sign_str.encode()).hexdigest().upper()
else:
# 通用签名方式
sorted_params = sorted(params.items(), key=lambda x: x[0])
sign_str = f"{self.appsecret}"
for k, v in sorted_params:
sign_str += f"{k}={v}&"
sign_str = sign_str.rstrip('&') + f"{self.appsecret}"
return hashlib.md5(sign_str.encode()).hexdigest().upper()
def search_shop_items(self,
shop_id: str,
page: int = 1,
page_size: int = 20,
sort: str = "default",
category_id: Optional[str] = None,
min_price: Optional[float] = None,
max_price: Optional[float] = None,
is_promotion: Optional[bool] = None) -> Optional[Dict]:
"""
搜索店铺商品
:param shop_id: 店铺ID
:param page: 页码
:param page_size: 每页条数
:param sort: 排序方式
:param category_id: 店铺内分类ID
:param min_price: 最低价格
:param max_price: 最高价格
:param is_promotion: 是否只看促销商品
:return: 商品列表数据
"""
# 验证排序方式
valid_sorts = ["default", "price_asc", "price_desc", "sales_desc", "new_desc"]
if sort not in valid_sorts:
logging.error(f"无效的排序方式: {sort},支持: {valid_sorts}")
return None
# 验证分页大小
if page_size < 1 or page_size > 100:
logging.error(f"每页条数必须在1-100之间,当前为: {page_size}")
return page_size > 100 and 100 or 1
# 构建基础参数
timestamp = int(time.time())
params = self._build_platform_params(shop_id, page, page_size, sort, timestamp)
# 添加可选参数
if category_id:
params["category_id"] = category_id
if min_price is not None:
params["min_price"] = min_price
if max_price is not None:
params["max_price"] = max_price
if is_promotion is not None:
params["is_promotion"] = "true" if is_promotion else "false"
# 生成签名
params["sign"] = self._generate_sign(params)
try:
response = self.session.get(self.base_url, params=params, timeout=15)
response.raise_for_status()
# 不同平台可能有不同的响应格式
if self.platform in ["jd", "1688"]:
result = response.json()
else:
# 淘宝等平台可能返回JSONP格式
json_str = re.findall(r'(\{.*\})', response.text)[0]
result = json.loads(json_str)
# 处理不同平台的响应结构
if self.platform == "taobao":
# 淘宝API响应结构
if "error_response" in result:
logging.error(f"获取商品失败: {result['error_response']['msg']} (错误码: {result['error_response']['code']})")
return None
items_data = result.get("tbk_shop_item_get_response", {}).get("results", {}).get("n_tbk_item", [])
total_count = result.get("tbk_shop_item_get_response", {}).get("total_results", 0)
elif self.platform == "jd":
# 京东API响应结构
items_data = result.get("jingdong_shop_item_search_response", {}).get("result", {}).get("items", [])
total_count = result.get("jingdong_shop_item_search_response", {}).get("result", {}).get("total_count", 0)
else:
# 通用响应结构
if result.get("code") != 0:
logging.error(f"获取商品失败: {result.get('msg', '未知错误')} (错误码: {result.get('code')})")
return None
items_data = result.get("data", {}).get("items", [])
total_count = result.get("data", {}).get("total_count", 0)
# 格式化商品数据
return self._format_items_data(items_data, total_count, page, page_size)
except RequestException as e:
logging.error(f"请求异常: {str(e)}")
return None
except (json.JSONDecodeError, IndexError) as e:
logging.error(f"解析响应失败: {str(e)}, 响应内容: {response.text[:200]}...")
return None
def _build_platform_params(self, shop_id: str, page: int, page_size: int, sort: str, timestamp: int) -> Dict:
"""构建不同平台的特有参数"""
if self.platform == "taobao":
return {
"method": "taobao.tbk.shop.item.get",
"app_key": self.appkey,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"format": "json",
"v": "2.0",
"fields": "num_iid,title,pict_url,price,orginal_price,sales,comment_count,shop_title,category_name",
"shop_id": shop_id,
"page_no": page,
"page_size": page_size,
"sort": sort
}
elif self.platform == "jd":
return {
"method": "jingdong.shop.item.search",
"app_key": self.appkey,
"timestamp": timestamp,
"360buy_param_json": json.dumps({
"shop_id": shop_id,
"page": page,
"page_size": page_size,
"sort": sort
})
}
else:
return {
"method": "item.search.shop",
"appkey": self.appkey,
"timestamp": timestamp,
"shop_id": shop_id,
"page": page,
"page_size": page_size,
"sort": sort
}
def _format_items_data(self, items_data: List[Dict], total_count: int, page: int, page_size: int) -> Dict:
"""格式化商品数据"""
# 分页信息
pagination = {
"total_items": total_count,
"total_pages": (total_count + page_size - 1) // page_size,
"current_page": page,
"page_size": page_size
}
# 格式化商品列表
items = []
for item in items_data:
# 适配不同平台的字段差异
if self.platform == "taobao":
item_id = item.get("num_iid")
price = self._safe_float(item.get("price"))
original_price = self._safe_float(item.get("orginal_price"))
sales = self._safe_int(item.get("sales"))
comment_count = self._safe_int(item.get("comment_count"))
category_name = item.get("category_name")
image_url = item.get("pict_url")
title = item.get("title")
elif self.platform == "jd":
item_id = item.get("sku_id")
price = self._safe_float(item.get("jd_price"))
original_price = self._safe_float(item.get("market_price"))
sales = self._safe_int(item.get("sales_count"))
comment_count = self._safe_int(item.get("comment_count"))
category_name = item.get("category_name")
image_url = item.get("image_url")
title = item.get("name")
else:
item_id = item.get("item_id")
price = self._safe_float(item.get("price"))
original_price = self._safe_float(item.get("original_price"))
sales = self._safe_int(item.get("sales_count"))
comment_count = self._safe_int(item.get("comment_count"))
category_name = item.get("category_name")
image_url = item.get("image_url")
title = item.get("title")
# 计算折扣
discount = 0
if original_price > 0 and price > 0:
discount = round((price / original_price) * 10, 1)
# 判断是否促销
is_promotion = original_price > price and original_price - price > 0.01
items.append({
"item_id": item_id,
"title": title,
"image_url": image_url,
"price": price,
"original_price": original_price,
"discount": discount,
"is_promotion": is_promotion,
"sales_count": sales,
"comment_count": comment_count,
"category_name": category_name,
"category_id": item.get("category_id"),
"url": item.get("url"),
"tags": item.get("tags", "").split(",") if item.get("tags") else []
})
return {
"pagination": pagination,
"items": items,
"raw_data": items_data # 保留原始数据
}
def get_shop_all_items(self, shop_id: str, max_pages: int = 20, **kwargs) -> List[Dict]:
"""
获取店铺所有商品
:param shop_id: 店铺ID
:param max_pages: 最大页数限制
:param**kwargs: 其他搜索参数
:return: 所有商品列表
"""
all_items = []
page = 1
while page <= max_pages:
logging.info(f"获取店铺第 {page} 页商品")
result = self.search_shop_items(
shop_id=shop_id,
page=page,
page_size=100, # 使用最大页大小减少请求次数
**kwargs
)
if not result or not result["items"]:
break
all_items.extend(result["items"])
# 检查是否已到最后一页
if page >= result["pagination"]["total_pages"]:
break
page += 1
# 控制请求频率,遵守平台API的QPS限制
time.sleep(2)
logging.info(f"共获取到 {len(all_items)} 件商品")
return all_items
def analyze_shop_items(self, items: List[Dict]) -> Dict:
"""分析店铺商品数据,生成店铺经营分析报告"""
if not items:
return {}
total = len(items)
# 价格分析
price_analysis = self._analyze_prices(items)
# 分类分析
category_analysis = self._analyze_categories(items)
# 销售分析
sales_analysis = self._analyze_sales(items)
# 促销分析
promotion_analysis = self._analyze_promotions(items)
# 提取热销商品
top_sales = sorted(items, key=lambda x: x["sales_count"], reverse=True)[:10]
# 提取高折扣商品
top_discounts = sorted([i for i in items if i["is_promotion"]], key=lambda x: x["discount"], reverse=False)[:10]
return {
"total_items": total,
"price_analysis": price_analysis,
"category_analysis": category_analysis,
"sales_analysis": sales_analysis,
"promotion_analysis": promotion_analysis,
"top_sales": top_sales,
"top_discounts": top_discounts
}
def _analyze_prices(self, items: List[Dict]) -> Dict:
"""分析价格分布"""
prices = [item["price"] for item in items if item["price"] > 0]
if not prices:
return {}
min_price = min(prices)
max_price = max(prices)
avg_price = round(sum(prices) / len(prices), 2)
# 价格区间分布
price_ranges = self._get_price_ranges(min_price, max_price)
range_counts = defaultdict(int)
for price in prices:
for r in price_ranges:
if r[0] <= price < r[1]:
range_counts[f"{r[0]}-{r[1]}"] += 1
break
else:
range_counts[f"{price_ranges[-1][1]}+"] += 1
return {
"min_price": min_price,
"max_price": max_price,
"avg_price": avg_price,
"median_price": self._calculate_median(prices),
"range_distribution": dict(range_counts)
}
def _analyze_categories(self, items: List[Dict]) -> Dict:
"""分析分类分布"""
category_counts = defaultdict(int)
category_sales = defaultdict(int)
category_prices = defaultdict(list)
for item in items:
cat_name = item["category_name"] or "未分类"
category_counts[cat_name] += 1
category_sales[cat_name] += item["sales_count"]
if item["price"] > 0:
category_prices[cat_name].append(item["price"])
# 计算每个分类的平均价格
category_avg_prices = {
cat: round(sum(prices)/len(prices), 2)
for cat, prices in category_prices.items() if prices
}
# 按商品数量排序
sorted_categories = sorted(category_counts.items(), key=lambda x: x[1], reverse=True)
return {
"total_categories": len(category_counts),
"distribution": dict(category_counts),
"sales_by_category": dict(category_sales),
"avg_price_by_category": category_avg_prices,
"top_categories": sorted_categories[:5]
}
def _analyze_sales(self, items: List[Dict]) -> Dict:
"""分析销售情况"""
total_sales = sum(item["sales_count"] for item in items)
avg_sales = round(total_sales / len(items), 1) if items else 0
# 销量分布
sales_ranges = [(0, 1), (1, 10), (10, 100), (100, 1000), (1000, float('inf'))]
sales_distribution = defaultdict(int)
for item in items:
sales = item["sales_count"]
for r in sales_ranges:
if r[0] <= sales < r[1]:
sales_distribution[f"{r[0]}-{r[1] if r[1] != float('inf') else '+'}"] += 1
break
# 销量与价格相关性
price_sales_correlation = self._calculate_correlation(
[item["price"] for item in items],
[item["sales_count"] for item in items]
)
return {
"total_sales": total_sales,
"avg_sales_per_item": avg_sales,
"distribution": dict(sales_distribution),
"price_sales_correlation": round(price_sales_correlation, 4)
}
def _analyze_promotions(self, items: List[Dict]) -> Dict:
"""分析促销情况"""
promotion_items = [item for item in items if item["is_promotion"]]
promotion_count = len(promotion_items)
promotion_ratio = round(promotion_count / len(items) * 100, 1) if items else 0
# 平均折扣力度
avg_discount = 0
if promotion_items:
avg_discount = round(sum(item["discount"] for item in promotion_items) / len(promotion_items), 1)
# 促销商品销量占比
promotion_sales = sum(item["sales_count"] for item in promotion_items)
total_sales = sum(item["sales_count"] for item in items)
promotion_sales_ratio = round(promotion_sales / total_sales * 100, 1) if total_sales > 0 else 0
return {
"promotion_count": promotion_count,
"promotion_ratio": promotion_ratio,
"avg_discount": avg_discount,
"sales_ratio": promotion_sales_ratio
}
# 工具方法
def _safe_float(self, value) -> float:
"""安全转换为float"""
try:
return float(value) if value is not None else 0.0
except (ValueError, TypeError):
return 0.0
def _safe_int(self, value) -> int:
"""安全转换为int"""
try:
return int(value) if value is not None else 0
except (ValueError, TypeError):
return 0
def _get_price_ranges(self, min_price: float, max_price: float) -> List[Tuple[float, float]]:
"""生成合理的价格区间"""
if min_price >= max_price:
return [(min_price - 1, max_price + 1)]
# 计算区间数量,最多10个区间
range_count = min(10, int(max_price - min_price) // 5 + 1)
step = (max_price - min_price) / range_count
ranges = []
for i in range(range_count):
start = min_price + i * step
end = min_price + (i + 1) * step
ranges.append((round(start, 1), round(end, 1)))
return ranges
def _calculate_median(self, data: List[float]) -> float:
"""计算中位数"""
sorted_data = sorted(data)
n = len(sorted_data)
if n % 2 == 1:
return round(sorted_data[n//2], 2)
else:
return round((sorted_data[n//2 - 1] + sorted_data[n//2]) / 2, 2)
def _calculate_correlation(self, x: List[float], y: List[float]) -> float:
"""计算皮尔逊相关系数"""
if len(x) != len(y) or len(x) < 2:
return 0.0
n = len(x)
sum_x = sum(x)
sum_y = sum(y)
sum_xy = sum(xi * yi for xi, yi in zip(x, y))
sum_x2 = sum(xi **2 for xi in x)
sum_y2 = sum(yi** 2 for yi in y)
numerator = n * sum_xy - sum_x * sum_y
denominator = ((n * sum_x2 - sum_x **2) * (n * sum_y2 - sum_y** 2)) **0.5
return numerator / denominator if denominator != 0 else 0.0
def visualize_analysis(self, analysis: Dict, output_dir: str = ".") -> None:
"""可视化分析结果"""
# 设置中文显示
plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"]
plt.rcParams["axes.unicode_minus"] = False
# 1. 价格分布直方图
plt.figure(figsize=(10, 6))
ranges = list(analysis["price_analysis"]["range_distribution"].keys())
counts = list(analysis["price_analysis"]["range_distribution"].values())
plt.bar(ranges, counts, color='skyblue')
plt.title("商品价格区间分布")
plt.xlabel("价格区间")
plt.ylabel("商品数量")
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig(f"{output_dir}/price_distribution.png")
plt.close()
# 2. 分类分布饼图
plt.figure(figsize=(10, 6))
top_cats = analysis["category_analysis"]["top_categories"]
if top_cats:
labels = [cat[0] for cat in top_cats]
sizes = [cat[1] for cat in top_cats]
# 其他分类合并
if len(analysis["category_analysis"]["distribution"]) > 5:
other_count = sum(analysis["category_analysis"]["distribution"].values()) - sum(sizes)
labels.append("其他")
sizes.append(other_count)
plt.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=90)
plt.title("商品分类分布")
plt.axis('equal')
plt.tight_layout()
plt.savefig(f"{output_dir}/category_distribution.png")
plt.close()
# 3. 销量分布直方图
plt.figure(figsize=(10, 6))
sales_ranges = list(analysis["sales_analysis"]["distribution"].keys())
sales_counts = list(analysis["sales_analysis"]["distribution"].values())
plt.bar(sales_ranges, sales_counts, color='lightgreen')
plt.title("商品销量区间分布")
plt.xlabel("销量区间")
plt.ylabel("商品数量")
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig(f"{output_dir}/sales_distribution.png")
plt.close()
# 4. 促销商品占比饼图
plt.figure(figsize=(8, 8))
promotion_counts = [
analysis["promotion_analysis"]["promotion_count"],
analysis["total_items"] - analysis["promotion_analysis"]["promotion_count"]
]
plt.pie(promotion_counts, labels=['促销商品', '非促销商品'],
autopct='%1.1f%%', startangle=90, colors=['orange', 'lightgray'])
plt.title("促销商品占比")
plt.axis('equal')
plt.tight_layout()
plt.savefig(f"{output_dir}/promotion_ratio.png")
plt.close()
logging.info(f"分析图表已保存至 {output_dir} 目录")
# 示例调用
if __name__ == "__main__":
# 替换为实际的appkey和appsecret(从对应平台开放平台获取)
APPKEY = "your_appkey"
APPSECRET = "your_appsecret"
# 替换为目标店铺ID
SHOP_ID = "123456"
# 平台选择:taobao, jd, 1688 或 general
PLATFORM = "general"
# 初始化API客户端
api = ShopItemSearchAPI(APPKEY, APPSECRET, platform=PLATFORM)
# 获取店铺所有商品
all_items = api.get_shop_all_items(
shop_id=SHOP_ID,
max_pages=5, # 最多获取5页
sort="sales_desc" # 按销量排序
)
if all_items:
# 分析店铺商品
analysis = api.analyze_shop_items(all_items)
print(f"=== 店铺商品分析报告 (店铺ID: {SHOP_ID}) ===")
print(f"店铺总商品数: {analysis['total_items']}件")
# 价格分析
print("\n价格分析:")
print(f" 价格范围: {analysis['price_analysis']['min_price']}-{analysis['price_analysis']['max_price']}元")
print(f" 平均价格: {analysis['price_analysis']['avg_price']}元")
print(f" 中位数价格: {analysis['price_analysis']['median_price']}元")
print(" 价格区间分布:")
for range_str, count in analysis['price_analysis']['range_distribution'].items():
print(f" {range_str}元: {count}件")
# 分类分析
print("\n分类分析:")
print(f" 总分类数: {analysis['category_analysis']['total_categories']}个")
print(" 商品数量最多的5个分类:")
for cat, count in analysis['category_analysis']['top_categories']:
print(f" {cat}: {count}件 ({round(count/analysis['total_items']*100, 1)}%)")
# 销售分析
print("\n销售分析:")
print(f" 总销量: {analysis['sales_analysis']['total_sales']}件")
print(f" 平均单品销量: {analysis['sales_analysis']['avg_sales_per_item']}件")
print(f" 价格与销量相关性: {analysis['sales_analysis']['price_sales_correlation']}")
# 促销分析
print("\n促销分析:")
print(f" 促销商品数: {analysis['promotion_analysis']['promotion_count']}件 ({analysis['promotion_analysis']['promotion_ratio']}%)")
print(f" 平均折扣: {analysis['promotion_analysis']['avg_discount']}折")
print(f" 促销商品销量占比: {analysis['promotion_analysis']['sales_ratio']}%")
# 热销商品
print("\n热销商品TOP5:")
for i, item in enumerate(analysis['top_sales'][:5], 1):
print(f" {i}. {item['title'][:30]}...")
print(f" 价格: {item['price']}元, 销量: {item['sales_count']}件")
# 生成可视化图表
api.visualize_analysis(analysis)