×

商品销量详情接口(item_get_sales)深度分析及 Python 实现

万邦科技Lex 万邦科技Lex 发表于2025-09-05 09:32:49 浏览268 评论0

抢沙发发表评论

             注册账号免费测试淘系接口

商品销量数据是电商分析中的核心指标,item_get_sales 接口专门用于获取指定商品的销量详情,包括历史销量趋势、时段销量分布、规格销量占比等精细化数据。该接口为销售策略优化、库存管理、竞品分析提供数据支持,广泛应用于电商运营和市场分析场景。

一、接口核心特性分析

1. 接口功能与定位

  • 核心功能:获取商品的销量明细数据,包括累计销量、日 / 周 / 月销量趋势、各规格销量占比等

  • 数据维度

    • 时间维度:实时销量、今日销量、近 7 天 / 30 天销量、历史销量曲线

    • 规格维度:各 SKU 销量占比、热销规格分析

    • 渠道维度:不同销售渠道的销量分布(部分平台支持)

  • 应用场景

    • 销售趋势预测与库存规划

    • 促销活动效果评估(对比活动前后销量)

    • 热销规格分析与生产计划调整

    • 竞品销量监控与市场份额分析

2. 认证机制

主流电商平台的销量接口均采用与商品详情接口一致的认证体系,通常为:


  • appkey + access_token 认证(如淘宝、京东、唯品会等)

  • 部分平台需要额外申请销量数据权限(敏感数据)

3. 核心参数与响应结构

请求参数

参数名类型是否必填说明
item_idString商品 ID
access_tokenString访问令牌
time_rangeString时间范围,如 "7d"(7 天)、"30d"(30 天)、"90d"(90 天)
granularityString时间粒度,如 "day"(按天)、"week"(按周)、"month"(按月)
include_skuBoolean是否包含 SKU 销量,默认 false

响应核心字段

  • 基础销量:累计销量、总销售额、平均客单价

  • 时间趋势:按指定粒度的销量 / 销售额数组(含日期标签)

  • 规格销量:各 SKU 的销量、销售额、占比

  • 环比数据:与上一周期的对比增长率

二、Python 脚本实现

以下是通用的 item_get_sales 接口调用实现,适配主流电商平台的销量数据获取逻辑:
import requests
import time
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, Optional, List
from requests.exceptions import RequestException

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)

class ItemSalesAPI:
    def __init__(self, appkey: str, appsecret: str, platform: str = "general"):
        """
        初始化商品销量API客户端
        :param appkey: 开放平台appkey
        :param appsecret: 开放平台appsecret
        :param platform: 平台标识,如"taobao"、"jd"、"vip"等
        """
        self.appkey = appkey
        self.appsecret = appsecret
        self.platform = platform
        self.base_urls = {
            "taobao": "https://eco.taobao.com",
            "jd": "https://api.jd.com",
            "vip": "https://api.vip.com",
            "mogujie": "https://api.mogujie.com",
            "general": "https://api.ecommerce.com"  # 通用平台地址
        }
        self.base_url = self.base_urls.get(platform, self.base_urls["general"])
        self.access_token = None
        self.token_expires_at = 0  # token过期时间戳
        self.session = requests.Session()
        self.session.headers.update({
            "Content-Type": "application/json",
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36"
        })

    def _get_access_token(self) -> Optional[str]:
        """获取访问令牌(适配多平台)"""
        # 检查token是否有效
        if self.access_token and self.token_expires_at > time.time() + 60:
            return self.access_token
            
        logging.info(f"获取{self.platform}平台新的access_token")
        
        # 不同平台的token获取地址和参数可能不同
        if self.platform == "taobao":
            url = f"{self.base_url}/oauth/token"
            params = {
                "grant_type": "client_credentials",
                "client_id": self.appkey,
                "client_secret": self.appsecret
            }
        elif self.platform == "jd":
            url = f"{self.base_url}/oauth2/token"
            params = {
                "grant_type": "client_credentials",
                "appkey": self.appkey,
                "appsecret": self.appsecret
            }
        else:  # 通用平台
            url = f"{self.base_url}/oauth/token"
            params = {
                "grant_type": "client_credentials",
                "client_id": self.appkey,
                "client_secret": self.appsecret
            }
        
        try:
            # 淘宝等平台用post,京东部分接口用get
            method = "post" if self.platform in ["taobao", "mogujie"] else "get"
            if method == "post":
                response = self.session.post(url, data=params, timeout=10)
            else:
                response = self.session.get(url, params=params, timeout=10)
                
            response.raise_for_status()
            result = response.json()
            
            if "access_token" in result:
                self.access_token = result["access_token"]
                # 设置过期时间,默认2小时
                self.token_expires_at = time.time() + result.get("expires_in", 7200)
                return self.access_token
            else:
                logging.error(f"获取access_token失败: {result.get('error_description', '未知错误')}")
                return None
                
        except RequestException as e:
            logging.error(f"获取access_token请求异常: {str(e)}")
            return None

    def get_item_sales(self, 
                      item_id: str, 
                      time_range: str = "30d", 
                      granularity: str = "day",
                      include_sku: bool = False) -> Optional[Dict]:
        """
        获取商品销量详情
        :param item_id: 商品ID
        :param time_range: 时间范围,如"7d"、"30d"、"90d"
        :param granularity: 时间粒度,"day"、"week"、"month"
        :param include_sku: 是否包含SKU销量数据
        :return: 销量详情数据
        """
        # 验证参数
        valid_ranges = ["7d", "30d", "90d", "180d", "365d"]
        if time_range not in valid_ranges:
            logging.error(f"无效的时间范围: {time_range},支持: {valid_ranges}")
            return None
            
        valid_granularities = ["day", "week", "month"]
        if granularity not in valid_granularities:
            logging.error(f"无效的时间粒度: {granularity},支持: {valid_granularities}")
            return None
            
        # 获取有效的access_token
        if not self._get_access_token():
            return None
            
        # 不同平台的接口路径可能不同
        if self.platform == "taobao":
            url = f"{self.base_url}/router/rest"
            params = {
                "method": "taobao.item.sales.get",
                "format": "json",
                "v": "2.0",
                "item_id": item_id,
                "time_range": time_range,
                "granularity": granularity,
                "include_sku": "true" if include_sku else "false",
                "access_token": self.access_token
            }
        else:  # 通用接口参数
            url = f"{self.base_url}/item/get_sales"
            params = {
                "item_id": item_id,
                "time_range": time_range,
                "granularity": granularity,
                "include_sku": include_sku,
                "access_token": self.access_token
            }
            
        try:
            response = self.session.get(url, params=params, timeout=15)
            response.raise_for_status()
            result = response.json()
            
            # 处理不同平台的响应格式
            if self.platform == "taobao" and "error_response" in result:
                logging.error(f"获取销量失败: {result['error_response'].get('msg')} (错误码: {result['error_response'].get('code')})")
                return None
            elif self.platform != "taobao" and result.get("code") != 0:
                logging.error(f"获取销量失败: {result.get('message')} (错误码: {result.get('code')})")
                return None
                
            # 提取核心数据(不同平台数据路径可能不同)
            raw_data = result.get("item_sales_get_response", {}).get("result", {}) \
                      if self.platform == "taobao" else result.get("data", {})
                      
            if not raw_data:
                logging.warning("未获取到销量数据")
                return None
                
            # 格式化销量数据
            return self._format_sales_data(raw_data, time_range, granularity)
            
        except RequestException as e:
            logging.error(f"获取销量请求异常: {str(e)}")
            return None
        except json.JSONDecodeError:
            logging.error(f"销量响应解析失败: {response.text[:200]}...")
            return None

    def _format_sales_data(self, sales_data: Dict, time_range: str, granularity: str) -> Dict:
        """格式化销量数据"""
        # 基础销量信息
        base_info = {
            "item_id": sales_data.get("item_id"),
            "total_sales": int(sales_data.get("total_sales", 0)),  # 累计销量
            "total_revenue": float(sales_data.get("total_revenue", 0)),  # 累计销售额
            "avg_price": round(
                float(sales_data.get("total_revenue", 0)) / max(int(sales_data.get("total_sales", 0)), 1), 
                2
            ),  # 平均客单价
            "time_range": time_range,
            "update_time": sales_data.get("update_time", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
        }
        
        # 时间趋势数据
        trend_data = {
            "granularity": granularity,
            "dates": sales_data.get("dates", []),  # 日期标签
            "sales_trend": [int(num) for num in sales_data.get("sales_trend", [])],  # 销量趋势
            "revenue_trend": [float(num) for num in sales_data.get("revenue_trend", [])],  # 销售额趋势
            "growth_rate": round(float(sales_data.get("growth_rate", 0)), 4)  # 环比增长率
        }
        
        # 计算周期内的关键指标
        if trend_data["sales_trend"]:
            trend_data["max_sales"] = max(trend_data["sales_trend"])  # 周期内最大销量
            trend_data["min_sales"] = min(trend_data["sales_trend"])  # 周期内最小销量
            trend_data["avg_daily_sales"] = round(
                sum(trend_data["sales_trend"]) / len(trend_data["sales_trend"]), 
                1
            )  # 日均销量
        
        # SKU销量分布(如包含)
        sku_sales = []
        if sales_data.get("sku_sales"):
            total_sku_sales = sum(int(sku.get("sales", 0)) for sku in sales_data["sku_sales"])
            for sku in sales_data["sku_sales"]:
                sales = int(sku.get("sales", 0))
                sku_sales.append({
                    "sku_id": sku.get("sku_id"),
                    "sku_name": sku.get("sku_name"),
                    "sales": sales,
                    "revenue": float(sku.get("revenue", 0)),
                    "proportion": round(sales / max(total_sku_sales, 1), 4) if total_sku_sales > 0 else 0,  # 销量占比
                    "price": float(sku.get("price", 0))
                })
            # 按销量排序
            sku_sales.sort(key=lambda x: x["sales"], reverse=True)
        
        return {
            "base_info": base_info,
            "trend_data": trend_data,
            "sku_sales": sku_sales,
            "raw_data": sales_data  # 保留原始数据
        }

    def get_sales_comparison(self, item_id: str, periods: List[str] = ["7d", "30d"]) -> Optional[Dict]:
        """
        获取多个周期的销量对比数据
        :param item_id: 商品ID
        :param periods: 周期列表
        :return: 对比数据
        """
        comparison = {}
        for period in periods:
            logging.info(f"获取{period}销量数据用于对比")
            sales_data = self.get_item_sales(item_id, time_range=period)
            if sales_data:
                comparison[period] = {
                    "total_sales": sales_data["base_info"]["total_sales"],
                    "total_revenue": sales_data["base_info"]["total_revenue"],
                    "avg_daily_sales": sales_data["trend_data"].get("avg_daily_sales", 0)
                }
            # 控制请求频率
            time.sleep(1)
        return comparison if comparison else None


# 示例调用
if __name__ == "__main__":
    # 替换为实际的appkey和appsecret
    APPKEY = "your_appkey"
    APPSECRET = "your_appsecret"
    # 替换为目标商品ID
    ITEM_ID = "12345678"
    # 平台标识,如"taobao"、"jd"、"vip"等
    PLATFORM = "taobao"
    
    # 初始化API客户端
    api = ItemSalesAPI(APPKEY, APPSECRET, platform=PLATFORM)
    
    # 获取单个周期的销量数据
    sales_details = api.get_item_sales(
        item_id=ITEM_ID,
        time_range="30d",
        granularity="day",
        include_sku=True
    )
    
    if sales_details:
        print(f"=== 商品销量详情 ({item_id}) ===")
        print(f"累计销量: {sales_details['base_info']['total_sales']}件")
        print(f"累计销售额: {sales_details['base_info']['total_revenue']}元")
        print(f"平均客单价: {sales_details['base_info']['avg_price']}元")
        print(f"时间范围: {sales_details['base_info']['time_range']}")
        print(f"环比增长率: {sales_details['trend_data']['growth_rate']*100}%")
        print(f"日均销量: {sales_details['trend_data'].get('avg_daily_sales', 0)}件")
        
        # 打印最近5天的销量
        if sales_details['trend_data']['dates'] and sales_details['trend_data']['sales_trend']:
            print("\n最近5天销量趋势:")
            recent_dates = sales_details['trend_data']['dates'][-5:]
            recent_sales = sales_details['trend_data']['sales_trend'][-5:]
            for date, sale in zip(recent_dates, recent_sales):
                print(f"  {date}: {sale}件")
        
        # 打印热销规格
        if sales_details['sku_sales']:
            print("\n热销规格TOP3:")
            for i, sku in enumerate(sales_details['sku_sales'][:3], 1):
                print(f"  {i}. {sku['sku_name']}: {sku['sales']}件 ({sku['proportion']*100:.1f}%)")
    
    # 获取多周期销量对比
    # sales_compare = api.get_sales_comparison(ITEM_ID, periods=["7d", "30d", "90d"])
    # if sales_compare:
    #     print("\n=== 多周期销量对比 ===")
    #     for period, data in sales_compare.items():
    #         print(f"{period}销量: {data['total_sales']}件, 日均: {data['avg_daily_sales']}件")

三、接口调用关键技术与注意事项

1. 数据获取策略

  • 时间粒度选择

    • 短期分析(1-7 天):使用 "day" 粒度,监控日常波动

    • 中期分析(1-3 个月):使用 "week" 粒度,观察周度趋势

    • 长期分析(半年以上):使用 "month" 粒度,分析季节性变化

  • 批量获取控制:对多个商品批量获取时,建议设置 2-3 秒间隔,避免触发频率限制

  • 缓存策略:销量数据非实时变动(多数平台 1-2 小时更新一次),建议缓存 30-60 分钟

2. 常见错误及解决方案

错误码说明解决方案
403权限不足申请销量数据访问权限(部分平台需单独申请)
429调用频率超限降低调用频率,实现令牌桶限流机制
500服务器错误重试机制(建议最多 3 次,间隔指数退避)
10002数据未生成新商品可能无历史数据,可先获取基础信息
10003时间范围超限缩短时间范围或降低数据粒度

3. 数据解析要点

  • 销量与销售额计算:注意区分 "销量(件数)" 和 "销售额(金额)" 字段

  • 环比增长率处理:负值表示销量下降,需特殊标记

  • SKU 占比计算:当总销量为 0 时避免除零错误

  • 时间格式统一:不同平台可能返回不同格式的日期字符串,需统一转换

四、应用场景与扩展建议

典型应用场景

  • 销售监控仪表板:实时监控商品销量变化,设置异常预警

  • 促销效果分析:对比活动前后销量数据,计算 ROI

  • 库存预警系统:结合销量趋势预测未来库存需求

  • 竞品对标分析:监控竞品销量变化,调整自身定价和促销策略

扩展建议

  • 实现销量预测模型:基于历史数据使用 ARIMA 或机器学习模型预测未来销量

  • 构建销量异常检测:通过标准差或 IQR 方法识别异常波动(如刷单、活动爆发)

  • 开发可视化报表:生成销量趋势图、SKU 占比饼图、环比对比柱状图

  • 结合价格数据:分析价格调整对销量的影响,优化定价策略

  • 多维度交叉分析:结合用户评价、流量数据,综合分析销量驱动因素


通过合理使用 item_get_sales 接口,开发者可以构建全面的商品销售分析系统,为电商运营决策提供数据支持。使用时需注意不同平台的接口差异和数据权限限制,确保合规获取和使用销量数据。

群贤毕至

访客