×

苏宁item_get - 获得商品详情接口深度分析及 Python 实现

万邦科技Lex 万邦科技Lex 发表于2025-09-27 10:19:14 浏览221 评论0

抢沙发发表评论

            注册账号免费测试苏宁API数据接口

苏宁易购(Suning.com)作为国内领先的综合零售平台,其item_get接口提供了通过商品 ID 获取商品详细信息的能力。该接口对于电商数据分析、价格监控、竞品研究等场景具有重要价值,能够获取商品价格、规格、库存、促销信息等关键数据。

一、苏宁# 苏宁 item_get 接口核心特性分析

1. 接口定位与核心价值

苏宁item_get接口专注于获取平台商品的全方位信息,其核心价值体现在:
  • 提供商品的实时价格和促销活动信息

  • 包含多规格商品的详细参数及库存状态

  • 提供商品图片、视频及详情描述

  • 包含品牌信息和售后服务说明

  • 支持获取评价统计和精选评价

  • 提供配送和安装服务相关信息

2. 接口权限与调用限制

使用苏宁开放平台接口需遵守以下规范:
限制类型具体规则说明
权限要求需注册苏宁开发者账号,创建应用并通过审核个人与企业开发者权限不同,企业开发者可获取更完整数据
调用频率基础权限:10 次 / 分钟;高级权限:60 次 / 分钟按 appkey 和 IP 双重限制
数据返回单次返回一个商品的完整详情,无分页商品信息实时更新
字段限制基础字段免费,部分高级字段需申请开通敏感信息受平台保护
应用限制禁止用于爬虫或商业竞争用途违规使用将导致账号封禁

3. 核心参数解析

必选参数

参数名类型说明示例
appKeyString应用唯一标识"suning_appkey_12345"
signString签名,按平台算法生成见下文签名逻辑
timestampString时间戳(yyyyMMddHHmmss)"20230701120000"
productCodeString商品 ID"1000123456"

可选参数

参数名类型说明示例
areaString地区编码,影响库存和价格"110100"(北京)
cargoNoString规格 ID,指定具体规格"12345678"
needCommentString是否需要评价数据"1"(是)、"0"(否)
needStoreString是否需要门店信息"1"(是)、"0"(否)
needPromotionString是否需要促销信息"1"(是)、"0"(否)

二、签名生成与返回数据结构

1. 签名生成逻辑

苏宁开放平台采用 MD5 签名算法,步骤如下:
  1. 收集所有请求参数(不包含sign),按参数名 ASCII 码升序排序

  2. 拼接为key=value格式的字符串,并用&连接(如"appKey=test&productCode=123"

  3. 在拼接字符串末尾加上&appSecret=你的appSecret

  4. 对整个字符串进行 MD5 加密,得到 32 位大写签名值

示例:参数:appKey=test&productCode=123×tamp=20230701120000拼接后:appKey=test&productCode=123×tamp=20230701120000&appSecret=abc123MD5 加密后得到签名:A1B2C3D4E5F6A7B8C9D0E1F2A3B4C5D6

2. 返回数据结构解析

接口返回 JSON 格式数据,核心结构包括:
  1. 基本信息:商品 ID、标题、品牌、类目、上架时间等

  2. 价格信息:当前价格、原价、会员价、限时价等

  3. 规格信息:颜色、尺寸、型号等属性及对应库存

  4. 媒体信息:主图、详情图、视频等 URL 列表

  5. 库存信息:总库存、各规格库存、地区库存差异

  6. 促销信息:优惠券、满减活动、赠品等

  7. 服务信息:售后政策、配送方式、安装服务等

  8. 评价信息:评分、评价数、好评率、精选评价等

关键数据字段详解:
  • productCode:商品唯一标识

  • productName:商品名称

  • brandName:品牌名称

  • price:当前售价

  • marketPrice:市场价(原价)

  • stock:库存数量

  • salesCount:销售数量

  • commentCount:评价数量

  • averageScore:平均评分

  • picList:商品图片 URL 列表

  • specList:规格参数列表

  • promotionList:促销活动列表

  • serviceList:服务列表

  • areaLimit:地区限购信息

三、Python 实现方案

以下是苏宁item_get接口的完整 Python 实现,包含接口调用、数据处理及分析功能:
import requests
import time
import hashlib
import json
import logging
import pandas as pd
import matplotlib.pyplot as plt
import re
from datetime import datetime
from typing import Dict, Optional, List, Tuple
from collections import defaultdict

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)

# 配置中文显示
plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"]
plt.rcParams["axes.unicode_minus"] = False

class SuningItemDetail:
    """苏宁item_get接口封装类,用于根据ID获取和分析商品详情数据"""
    
    def __init__(self, app_key: str, app_secret: str):
        """
        初始化苏宁API客户端
        :param app_key: 应用的appKey
        :param app_secret: 应用的appSecret
        """
        self.app_key = app_key
        self.app_secret = app_secret
        self.api_url = "https://open.suning.com/api/http/sopRequest"
        
        # 频率控制
        self.rate_limit = 10  # 默认基础权限,高级权限可修改为60次/分钟
        self.call_timestamps = []  # 存储调用时间戳(秒级)
    
    def set_rate_limit(self, limit: int) -> None:
        """设置调用频率限制(次/分钟)"""
        if 10 <= limit <= 60:
            self.rate_limit = limit
            logging.info(f"已设置调用频率限制为 {limit} 次/分钟")
        else:
            logging.warning("频率限制必须在10-60之间,未修改")
    
    def _generate_sign(self, params: Dict) -> str:
        """生成签名(MD5算法)"""
        # 1. 按参数名ASCII升序排序
        sorted_params = sorted(params.items(), key=lambda x: x[0])
        
        # 2. 拼接为"key=value&key=value"格式
        param_str = "&".join([f"{k}={v}" for k, v in sorted_params])
        
        # 3. 拼接appSecret并进行MD5加密
        sign_str = f"{param_str}&appSecret={self.app_secret}"
        md5 = hashlib.md5()
        md5.update(sign_str.encode('utf-8'))
        sign = md5.hexdigest().upper()  # 转为大写
        
        return sign
    
    def _check_rate_limit(self) -> None:
        """检查并控制调用频率"""
        current_time = time.time()  # 秒级
        # 保留1分钟内的调用记录
        self.call_timestamps = [t for t in self.call_timestamps if current_time - t < 60]
        
        # 若超过限制,计算需要等待的时间
        if len(self.call_timestamps) >= self.rate_limit:
            oldest_time = self.call_timestamps[0]
            sleep_time = 60 - (current_time - oldest_time) + 0.1  # 额外加0.1秒保险
            logging.warning(f"调用频率超限,等待 {sleep_time:.1f} 秒")
            time.sleep(sleep_time)
            # 再次清理过期记录
            self.call_timestamps = [t for t in self.call_timestamps if time.time() - t < 60]
        
        # 记录本次调用时间
        self.call_timestamps.append(current_time)
    
    def get_item_detail(self, product_code: str, 
                       area: str = "110100",  # 默认北京地区
                       cargo_no: Optional[str] = None,
                       need_comment: bool = True,
                       need_store: bool = False,
                       need_promotion: bool = True) -> Optional[Dict]:
        """
        根据商品ID获取商品详情数据
        :param product_code: 商品ID
        :param area: 地区编码
        :param cargo_no: 规格ID,指定具体规格
        :param need_comment: 是否需要评价数据
        :param need_store: 是否需要门店信息
        :param need_promotion: 是否需要促销信息
        :return: 商品详情数据
        """
        # 构建基础参数
        base_params = {
            "appKey": self.app_key,
            "timestamp": datetime.now().strftime("%Y%m%d%H%M%S"),  # 时间戳格式:yyyyMMddHHmmss
            "method": "suning.sngoods.item.get",  # 苏宁商品详情接口方法名
            "version": "v1.2",  # 接口版本
            "productCode": product_code,
            "area": area,
            "needComment": "1" if need_comment else "0",
            "needStore": "1" if need_store else "0",
            "needPromotion": "1" if need_promotion else "0"
        }
        
        # 添加可选参数
        if cargo_no:
            base_params["cargoNo"] = cargo_no
        
        # 生成签名
        sign = self._generate_sign(base_params)
        base_params["sign"] = sign
        
        # 检查频率限制
        self._check_rate_limit()
        
        try:
            # 发送请求
            response = requests.get(self.api_url, params=base_params, timeout=10)
            response.raise_for_status()
            
            # 解析响应
            result = response.json()
            
            # 处理错误
            if "sn_responseContent" not in result:
                logging.error(f"API响应格式错误: {result}")
                return None
                
            response_content = result["sn_responseContent"]
            if "sn_error" in response_content:
                error = response_content["sn_error"]
                logging.error(f"API调用错误: {error.get('error_msg')} (错误码: {error.get('error_code')})")
                return None
                
            # 提取结果
            item_data = response_content.get("sn_body", {}).get("item", {})
            if not item_data:
                logging.warning("未获取到商品详情数据")
                return None
                
            logging.info(f"成功获取商品 {product_code} 的详情数据")
            return item_data
            
        except requests.exceptions.RequestException as e:
            logging.error(f"请求异常: {str(e)}")
            return None
        except json.JSONDecodeError:
            logging.error(f"响应解析失败: {response.text[:200]}...")
            return None
    
    def batch_get_item_details(self, product_codes: List[str], 
                             area: str = "110100",
                             need_comment: bool = False,  # 批量获取默认不获取评价,减少数据量
                             need_promotion: bool = True) -> Tuple[List[Dict], int]:
        """
        批量获取多个商品的详情数据
        :param product_codes: 商品ID列表
        :param area: 地区编码
        :param need_comment: 是否需要评价数据
        :param need_promotion: 是否需要促销信息
        :return: 商品详情列表和成功获取的数量
        """
        all_items = []
        success_count = 0
        
        for product_code in product_codes:
            logging.info(f"正在获取商品 {product_code} 的详情...")
            item_data = self.get_item_detail(
                product_code=product_code,
                area=area,
                need_comment=need_comment,
                need_store=False,
                need_promotion=need_promotion
            )
            
            if item_data:
                all_items.append(item_data)
                success_count += 1
        
        logging.info(f"批量获取完成,共请求 {len(product_codes)} 个商品,成功获取 {success_count} 个")
        return all_items, success_count
    
    def analyze_item(self, item_data: Dict) -> Dict:
        """分析单个商品数据"""
        if not item_data:
            return {"error": "没有商品数据可分析"}
        
        # 1. 价格分析
        price_info = {
            "current_price": 0,
            "market_price": 0,
            "discount": 0,  # 折扣率
            "has_member_price": False,
            "member_price": 0,
            "price_drop": 0  # 降价幅度
        }
        
        try:
            price_info["current_price"] = float(item_data.get("price", 0))
            price_info["market_price"] = float(item_data.get("marketPrice", price_info["current_price"]))
            
            # 计算折扣率
            if price_info["market_price"] > 0:
                price_info["discount"] = round(price_info["current_price"] / price_info["market_price"], 2)
                price_info["price_drop"] = round(price_info["market_price"] - price_info["current_price"], 2)
            
            # 会员价分析
            if "memberPrice" in item_data and float(item_data["memberPrice"]) > 0:
                price_info["has_member_price"] = True
                price_info["member_price"] = float(item_data["memberPrice"])
        except (ValueError, TypeError):
            pass
        
        # 2. 库存与规格分析
        spec_info = {
            "total_stock": 0,
            "spec_count": 0,
            "has_stock_variation": False,  # 不同规格库存是否有差异
            "stock_status": "无库存",  # 有库存/无库存/部分规格有库存
            "main_specs": {}  # 主要规格类型统计
        }
        
        try:
            # 总库存
            spec_info["total_stock"] = int(item_data.get("stock", 0))
            
            # 规格分析
            spec_list = item_data.get("specList", [])
            if isinstance(spec_list, list) and len(spec_list) > 0:
                spec_info["spec_count"] = len(spec_list)
                
                # 统计各规格库存
                stocks = []
                spec_types = defaultdict(set)
                
                for spec in spec_list:
                    # 库存统计
                    try:
                        stock = int(spec.get("stock", 0))
                        stocks.append(stock)
                    except (ValueError, TypeError):
                        pass
                    
                    # 规格属性统计
                    specAttr = spec.get("specAttr", {})
                    if isinstance(specAttr, dict):
                        for key, value in specAttr.items():
                            spec_types[key].add(value)
                
                # 转换为规格类型:数量的形式
                for key, values in spec_types.items():
                    spec_info["main_specs"][key] = len(values)
                
                # 检查库存是否有差异
                if len(stocks) > 1 and len(set(stocks)) > 1:
                    spec_info["has_stock_variation"] = True
                
                # 判断库存状态
                has_stock = any(s > 0 for s in stocks)
                all_stock = all(s > 0 for s in stocks)
                
                if has_stock:
                    spec_info["stock_status"] = "全部有库存" if all_stock else "部分规格有库存"
        except Exception as e:
            logging.warning(f"规格分析出错: {str(e)}")
        
        # 3. 促销分析
        promotion_info = {
            "has_promotion": False,
            "promotion_count": 0,
            "promotion_types": [],
            "has_coupon": False,
            "coupon_count": 0,
            "has_gift": False,
            "gift_count": 0
        }
        
        try:
            promotion_list = item_data.get("promotionList", [])
            if isinstance(promotion_list, list) and len(promotion_list) > 0:
                promotion_info["has_promotion"] = True
                promotion_info["promotion_count"] = len(promotion_list)
                
                # 分类统计促销类型
                types = set()
                coupon_count = 0
                gift_count = 0
                
                for promo in promotion_list:
                    promo_type = promo.get("promotionType", "unknown")
                    types.add(promo_type)
                    
                    if "优惠券" in promo_type or "满减" in promo_type:
                        coupon_count += 1
                    elif "赠品" in promo_type:
                        gift_count += 1
                
                promotion_info["promotion_types"] = list(types)
                promotion_info["has_coupon"] = coupon_count > 0
                promotion_info["coupon_count"] = coupon_count
                promotion_info["has_gift"] = gift_count > 0
                promotion_info["gift_count"] = gift_count
        except Exception as e:
            logging.warning(f"促销分析出错: {str(e)}")
        
        # 4. 评价分析
        comment_info = {
            "comment_count": 0,
            "average_score": 0,
            "positive_rate": 0,  # 好评率
            "score_distribution": {},  # 评分分布
            "has_image_comment": False,
            "image_comment_count": 0
        }
        
        try:
            comment_summary = item_data.get("commentSummary", {})
            if comment_summary:
                comment_info["comment_count"] = int(comment_summary.get("commentCount", 0))
                comment_info["average_score"] = float(comment_summary.get("averageScore", 0))
                
                # 好评率计算
                good_count = int(comment_summary.get("goodCount", 0))
                general_count = int(comment_summary.get("generalCount", 0))
                poor_count = int(comment_summary.get("poorCount", 0))
                total = good_count + general_count + poor_count
                
                if total > 0:
                    comment_info["positive_rate"] = round(good_count / total, 2)
                
                # 评分分布
                score_distribution = comment_summary.get("scoreDistribution", [])
                if isinstance(score_distribution, list):
                    for score in score_distribution:
                        comment_info["score_distribution"][f"{score.get('score')}分"] = score.get("count", 0)
                
                # 带图评价
                comment_info["image_comment_count"] = int(comment_summary.get("imageCommentCount", 0))
                comment_info["has_image_comment"] = comment_info["image_comment_count"] > 0
        except Exception as e:
            logging.warning(f"评价分析出错: {str(e)}")
        
        # 5. 服务分析
        service_info = {
            "service_types": [],
            "has_free_shipping": False,
            "has_installation": False,
            "return_policy": "",
            "after_sales_service": []
        }
        
        try:
            service_list = item_data.get("serviceList", [])
            if isinstance(service_list, list):
                service_types = []
                for service in service_list:
                    service_name = service.get("serviceName", "")
                    service_types.append(service_name)
                    
                    if "免运费" in service_name:
                        service_info["has_free_shipping"] = True
                    if "安装" in service_name:
                        service_info["has_installation"] = True
                
                service_info["service_types"] = service_types
            
            # 售后政策
            if "afterSaleDesc" in item_data:
                service_info["return_policy"] = item_data["afterSaleDesc"]
            
            # 售后服务
            if "afterSalesService" in item_data:
                service_info["after_sales_service"] = item_data["afterSalesService"].split(";")
        except Exception as e:
            logging.warning(f"服务分析出错: {str(e)}")
        
        return {
            "product_code": item_data.get("productCode"),
            "product_name": item_data.get("productName"),
            "brand_name": item_data.get("brandName"),
            "category": item_data.get("categoryName"),
            "price_analysis": price_info,
            "spec_analysis": spec_info,
            "promotion_analysis": promotion_info,
            "comment_analysis": comment_info,
            "service_analysis": service_info,
            "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }
    
    def compare_items(self, items_data: List[Dict]) -> Dict:
        """对比多个商品数据"""
        if len(items_data) < 2:
            return {"error": "至少需要2个商品进行对比"}
        
        comparison = {
            "total_items": len(items_data),
            "price_comparison": [],
            "discount_comparison": [],
            "sales_comparison": [],
            "rating_comparison": [],
            "spec_comparison": []
        }
        
        # 提取各商品关键指标用于对比
        for item in items_data:
            product_code = item.get("productCode")
            name = item.get("productName", "")[:30]  # 截断长标题
            
            # 价格对比
            try:
                current_price = float(item.get("price", 0))
                market_price = float(item.get("marketPrice", current_price))
                discount = round(current_price / market_price, 2) if market_price > 0 else 0
            except (ValueError, TypeError):
                current_price = 0
                market_price = 0
                discount = 0
                
            comparison["price_comparison"].append({
                "product_code": product_code,
                "name": name,
                "current_price": current_price,
                "market_price": market_price,
                "discount": discount
            })
            
            # 折扣对比
            comparison["discount_comparison"].append({
                "product_code": product_code,
                "name": name,
                "discount": discount,
                "has_promotion": "promotionList" in item and len(item.get("promotionList", [])) > 0
            })
            
            # 销量对比
            try:
                sales_count = int(item.get("salesCount", 0))
                comment_count = int(item.get("commentSummary", {}).get("commentCount", 0))
            except (ValueError, TypeError):
                sales_count = 0
                comment_count = 0
                
            comparison["sales_comparison"].append({
                "product_code": product_code,
                "name": name,
                "sales_count": sales_count,
                "comment_count": comment_count
            })
            
            # 评分对比
            try:
                avg_rating = float(item.get("commentSummary", {}).get("averageScore", 0))
                good_count = int(item.get("commentSummary", {}).get("goodCount", 0))
                general_count = int(item.get("commentSummary", {}).get("generalCount", 0))
                poor_count = int(item.get("commentSummary", {}).get("poorCount", 0))
                total = good_count + general_count + poor_count
                positive_rate = round(good_count / total, 2) if total > 0 else 0
            except (ValueError, TypeError):
                avg_rating = 0
                positive_rate = 0
                
            comparison["rating_comparison"].append({
                "product_code": product_code,
                "name": name,
                "avg_rating": avg_rating,
                "positive_rate": positive_rate
            })
            
            # 规格对比
            try:
                spec_count = len(item.get("specList", []))
                total_stock = int(item.get("stock", 0))
            except (ValueError, TypeError):
                spec_count = 0
                total_stock = 0
                
            comparison["spec_comparison"].append({
                "product_code": product_code,
                "name": name,
                "spec_count": spec_count,
                "total_stock": total_stock
            })
        
        # 排序各对比项以便分析
        comparison["price_comparison"].sort(key=lambda x: x["current_price"])
        comparison["discount_comparison"].sort(key=lambda x: x["discount"])
        comparison["sales_comparison"].sort(key=lambda x: x["sales_count"], reverse=True)
        comparison["rating_comparison"].sort(key=lambda x: x["avg_rating"], reverse=True)
        
        return comparison
    
    def visualize_comparison(self, comparison: Dict, output_dir: str = ".") -> None:
        """可视化多个商品的对比结果"""
        if "error" in comparison:
            logging.warning(comparison["error"])
            return
        
        # 1. 价格与原价对比条形图
        if comparison["price_comparison"]:
            plt.figure(figsize=(12, 6))
            items = [item["name"] for item in comparison["price_comparison"]]
            current_prices = [item["current_price"] for item in comparison["price_comparison"]]
            market_prices = [item["market_price"] for item in comparison["price_comparison"]]
            
            x = range(len(items))
            width = 0.35
            
            plt.bar([i - width/2 for i in x], current_prices, width, label='当前价')
            plt.bar([i + width/2 for i in x], market_prices, width, label='市场价')
            
            plt.xlabel('商品')
            plt.ylabel('价格 (元)')
            plt.title('商品价格对比')
            plt.xticks(x, items, rotation=45)
            plt.legend()
            plt.tight_layout()
            plt.savefig(f"{output_dir}/suning_price_comparison.png")
            plt.close()
            logging.info(f"价格对比图表已保存至 {output_dir}/suning_price_comparison.png")
        
        # 2. 折扣对比条形图
        if comparison["discount_comparison"]:
            plt.figure(figsize=(12, 6))
            items = [item["name"] for item in comparison["discount_comparison"]]
            discounts = [item["discount"] * 10 for item in comparison["discount_comparison"]]  # 转换为十分制
            
            bars = plt.bar(items, discounts, color=['green' if item["has_promotion"] else 'blue' 
                                                  for item in comparison["discount_comparison"]])
            
            # 添加促销标记
            for i, item in enumerate(comparison["discount_comparison"]):
                if item["has_promotion"]:
                    bars[i].set_label('带促销' if i == 0 else "")
            
            plt.xlabel('商品')
            plt.ylabel('折扣 (10=原价, 越低越优惠)')
            plt.title('商品折扣对比')
            plt.xticks(rotation=45)
            plt.legend()
            
            for i, v in enumerate(discounts):
                plt.text(i, v + 0.2, f"{v/10:.1f}折", ha='center')
            
            plt.tight_layout()
            plt.savefig(f"{output_dir}/suning_discount_comparison.png")
            plt.close()
            logging.info(f"折扣对比图表已保存至 {output_dir}/suning_discount_comparison.png")
        
        # 3. 销量与评价对比条形图
        if comparison["sales_comparison"]:
            plt.figure(figsize=(12, 6))
            items = [item["name"] for item in comparison["sales_comparison"]]
            sales = [item["sales_count"] for item in comparison["sales_comparison"]]
            comments = [item["comment_count"] for item in comparison["sales_comparison"]]
            
            x = range(len(items))
            width = 0.35
            
            plt.bar([i - width/2 for i in x], sales, width, label='销量')
            plt.bar([i + width/2 for i in x], comments, width, label='评价数')
            
            plt.xlabel('商品')
            plt.ylabel('数量')
            plt.title('商品销量与评价对比')
            plt.xticks(x, items, rotation=45)
            plt.legend()
            plt.tight_layout()
            plt.savefig(f"{output_dir}/suning_sales_comparison.png")
            plt.close()
            logging.info(f"销量对比图表已保存至 {output_dir}/suning_sales_comparison.png")
        
        # 4. 评分对比条形图
        if comparison["rating_comparison"]:
            plt.figure(figsize=(12, 6))
            items = [item["name"] for item in comparison["rating_comparison"]]
            ratings = [item["avg_rating"] for item in comparison["rating_comparison"]]
            positive_rates = [item["positive_rate"] * 100 for item in comparison["rating_comparison"]]
            
            x = range(len(items))
            width = 0.35
            
            plt.bar([i - width/2 for i in x], ratings, width, label='平均评分')
            plt.bar([i + width/2 for i in x], positive_rates, width, label='好评率(%)')
            
            plt.xlabel('商品')
            plt.ylabel('分数/百分比')
            plt.title('商品评分对比')
            plt.xticks(x, items, rotation=45)
            plt.legend()
            plt.tight_layout()
            plt.savefig(f"{output_dir}/suning_rating_comparison.png")
            plt.close()
            logging.info(f"评分对比图表已保存至 {output_dir}/suning_rating_comparison.png")
    
    def export_to_excel(self, items_data: List[Dict], analyses: List[Dict], comparison: Optional[Dict], 
                       filename: str) -> None:
        """导出商品数据到Excel"""
        if not items_data and not analyses:
            logging.warning("没有数据可导出")
            return
            
        try:
            with pd.ExcelWriter(filename) as writer:
                # 商品基本信息
                if items_data:
                    basic_info = []
                    for item in items_data:
                        # 计算折扣
                        try:
                            price = float(item.get("price", 0))
                            market_price = float(item.get("marketPrice", price))
                            discount = round(price / market_price, 2) if market_price > 0 else 0
                        except (ValueError, TypeError):
                            discount = 0
                            
                        info = {
                            "商品ID": item.get("productCode"),
                            "标题": item.get("productName"),
                            "品牌": item.get("brandName"),
                            "类目": item.get("categoryName"),
                            "当前价(元)": item.get("price"),
                            "市场价(元)": item.get("marketPrice"),
                            "折扣": discount,
                            "销量": item.get("salesCount"),
                            "评价数": item.get("commentSummary", {}).get("commentCount"),
                            "评分": item.get("commentSummary", {}).get("averageScore"),
                            "库存": item.get("stock"),
                            "规格数量": len(item.get("specList", []))
                        }
                        basic_info.append(info)
                    
                    df_basic = pd.DataFrame(basic_info)
                    df_basic.to_excel(writer, sheet_name='商品基本信息', index=False)
                
                # 规格详情
                for item in items_data:
                    if item.get("specList"):
                        spec_details = []
                        for spec in item.get("specList", []):
                            spec_attr = spec.get("specAttr", {})
                            spec_attr_str = ", ".join([f"{k}:{v}" for k, v in spec_attr.items()])
                            
                            details = {
                                "规格ID": spec.get("cargoNo"),
                                "属性组合": spec_attr_str,
                                "价格(元)": spec.get("price"),
                                "库存": spec.get("stock"),
                                "是否有货": "是" if int(spec.get("stock", 0)) > 0 else "否"
                            }
                            spec_details.append(details)
                        
                        df_sku = pd.DataFrame(spec_details)
                        df_sku.to_excel(writer, sheet_name=f'规格_{item.get("productCode")[:6]}', index=False)
                
                # 促销信息
                for item in items_data:
                    if item.get("promotionList"):
                        promotion_details = []
                        for promo in item.get("promotionList", []):
                            details = {
                                "促销ID": promo.get("promotionId"),
                                "促销类型": promo.get("promotionType"),
                                "促销名称": promo.get("promotionName"),
                                "开始时间": promo.get("startTime"),
                                "结束时间": promo.get("endTime"),
                                "描述": promo.get("promotionDesc")
                            }
                            promotion_details.append(details)
                        
                        df_promo = pd.DataFrame(promotion_details)
                        df_promo.to_excel(writer, sheet_name=f'促销_{item.get("productCode")[:6]}', index=False)
                
                # 分析结果
                if analyses:
                    analysis_summary = []
                    for analysis in analyses:
                        summary = {
                            "商品ID": analysis.get("product_code"),
                            "标题": analysis.get("product_name"),
                            "品牌": analysis.get("brand_name"),
                            "当前价(元)": analysis["price_analysis"]["current_price"],
                            "市场价(元)": analysis["price_analysis"]["market_price"],
                            "折扣": analysis["price_analysis"]["discount"],
                            "降价幅度(元)": analysis["price_analysis"]["price_drop"],
                            "销量": analysis["comment_analysis"]["comment_count"],  # 用评价数近似销量
                            "评分": analysis["comment_analysis"]["average_score"],
                            "好评率": analysis["comment_analysis"]["positive_rate"],
                            "规格数量": analysis["spec_analysis"]["spec_count"],
                            "总库存": analysis["spec_analysis"]["total_stock"],
                            "是否有促销": "是" if analysis["promotion_analysis"]["has_promotion"] else "否"
                        }
                        analysis_summary.append(summary)
                    
                    df_analysis = pd.DataFrame(analysis_summary)
                    df_analysis.to_excel(writer, sheet_name='分析摘要', index=False)
                
                # 对比结果
                if comparison and "error" not in comparison:
                    df_price = pd.DataFrame(comparison["price_comparison"])
                    df_price.to_excel(writer, sheet_name='价格对比', index=False)
                    
                    df_discount = pd.DataFrame(comparison["discount_comparison"])
                    df_discount.to_excel(writer, sheet_name='折扣对比', index=False)
                    
                    df_sales = pd.DataFrame(comparison["sales_comparison"])
                    df_sales.to_excel(writer, sheet_name='销量对比', index=False)
            
            logging.info(f"数据已导出至 {filename}")
        except Exception as e:
            logging.error(f"导出Excel失败: {e}")


# 示例调用
if __name__ == "__main__":
    # 替换为实际的参数(从苏宁开放平台获取)
    APP_KEY = "your_app_key"
    APP_SECRET = "your_app_secret"
    PRODUCT_CODE = "1000123456"  # 商品ID示例
    # 多个商品ID用于批量获取和对比
    MULTIPLE_PRODUCT_CODES = ["1000123456", "1000654321", "1000112233"]
    
    # 初始化API客户端
    suning_item = SuningItemDetail(APP_KEY, APP_SECRET)
    # 若为高级权限,设置更高的频率限制
    # suning_item.set_rate_limit(60)
    
    # 1. 获取单个商品详情
    print("=== 获取单个商品详情 ===")
    item_detail = suning_item.get_item_detail(
        product_code=PRODUCT_CODE,
        area="110100",  # 北京地区
        need_comment=True,
        need_promotion=True
    )
    
    if item_detail:
        print(f"商品ID: {item_detail.get('productCode')}")
        print(f"标题: {item_detail.get('productName')[:50]}...")
        print(f"品牌: {item_detail.get('brandName')}")
        print(f"类目: {item_detail.get('categoryName')}")
        print(f"当前价格: ¥{item_detail.get('price')}")
        print(f"市场价: ¥{item_detail.get('marketPrice')}")
        try:
            price = float(item_detail.get('price', 0))
            market_price = float(item_detail.get('marketPrice', price))
            print(f"折扣: {round(price/market_price, 2)}")
        except:
            pass
        print(f"库存: {item_detail.get('stock')}")
        print(f"销量: {item_detail.get('salesCount')}")
        print(f"评价数: {item_detail.get('commentSummary', {}).get('commentCount')}")
        print(f"评分: {item_detail.get('commentSummary', {}).get('averageScore')}")
        print(f"规格数量: {len(item_detail.get('specList', []))}")
        print(f"是否有促销: {'是' if len(item_detail.get('promotionList', [])) > 0 else '否'}")
    
    # 2. 分析商品详情
    print("\n=== 商品详情分析 ===")
    if item_detail:
        analysis = suning_item.analyze_item(item_detail)
        
        print("价格分析:")
        print(f"  当前价格: ¥{analysis['price_analysis']['current_price']}")
        print(f"  市场价: ¥{analysis['price_analysis']['market_price']}")
        print(f"  折扣: {analysis['price_analysis']['discount']} (省¥{analysis['price_analysis']['price_drop']})")
        print(f"  会员价: {'有' if analysis['price_analysis']['has_member_price'] else '无'}")
        
        print("\n规格分析:")
        print(f"  总库存: {analysis['spec_analysis']['total_stock']}")
        print(f"  规格数量: {analysis['spec_analysis']['spec_count']}")
        print(f"  库存状态: {analysis['spec_analysis']['stock_status']}")
        print(f"  主要规格: {', '.join([f'{k}:{v}种' for k, v in analysis['spec_analysis']['main_specs'].items()])}")
        
        print("\n促销分析:")
        print(f"  是否有促销: {'是' if analysis['promotion_analysis']['has_promotion'] else '否'}")
        if analysis['promotion_analysis']['has_promotion']:
            print(f"  促销类型: {', '.join(analysis['promotion_analysis']['promotion_types'])}")
            print(f"  优惠券数量: {analysis['promotion_analysis']['coupon_count']}")
            print(f"  赠品数量: {analysis['promotion_analysis']['gift_count']}")
        
        print("\n评价分析:")
        print(f"  评价数: {analysis['comment_analysis']['comment_count']}")
        print(f"  平均评分: {analysis['comment_analysis']['average_score']}")
        print(f"  好评率: {analysis['comment_analysis']['positive_rate']:.2%}")
        print(f"  带图评价数: {analysis['comment_analysis']['image_comment_count']}")
        
        print("\n服务分析:")
        print(f"  服务类型: {', '.join(analysis['service_analysis']['service_types'][:5])}")
        print(f"  是否免运费: {'是' if analysis['service_analysis']['has_free_shipping'] else '否'}")
        print(f"  是否提供安装: {'是' if analysis['service_analysis']['has_installation'] else '否'}")
    
    # 3. 批量获取多个商品详情并对比
    print("\n=== 批量获取与商品对比 ===")
    items, success_count = suning_item.batch_get_item_details(
        product_codes=MULTIPLE_PRODUCT_CODES,
        area="110100"
    )
    
    if items and len(items) >= 2:
        # 分析每个商品
        analyses = [suning_item.analyze_item(item) for item in items]
        
        # 对比商品
        comparison = suning_item.compare_items(items)
        
        print("\n价格对比 (从低到高):")
        for i, item in enumerate(comparison["price_comparison"][:3], 1):
            print(f"{i}. {item['name']} - ¥{item['current_price']} (市场价: ¥{item['market_price']})")
        
        print("\n折扣对比 (从低到高):")
        for i, item in enumerate(comparison["discount_comparison"][:3], 1):
            print(f"{i}. {item['name']} - {item['discount']}折 (促销: {'有' if item['has_promotion'] else '无'})")
        
        print("\n销量对比 (从高到低):")
        for i, item in enumerate(comparison["sales_comparison"][:3], 1):
            print(f"{i}. {item['name']} - 销量: {item['sales_count']}")
        
        # 4. 可视化对比结果
        suning_item.visualize_comparison(comparison)
        
        # 5. 导出数据到Excel
        suning_item.export_to_excel(items, analyses, comparison, "苏宁商品详情分析.xlsx")
    elif len(items) == 1:
        print("获取的商品数量不足,无法进行对比分析")
    else:
        print("未获取到足够的商品数据")

四、接口调用注意事项

1. 常见错误及解决方案

错误码说明解决方案
1001无效的 appKey检查 appKey 是否正确,应用是否已通过审核
1002签名错误检查签名算法实现,确保使用 MD5 和正确的 appSecret
1003权限不足申请更高权限或检查是否已开通 item_get 接口权限
1004调用频率超限降低调用频率,或申请提高权限等级
2001商品 ID 无效检查 productCode 是否正确,格式是否符合要求
2002商品不存在或已下架验证商品是否存在,是否已被删除或下架
2003参数错误检查各参数格式是否正确,特别是地区编码和布尔参数
2004地区编码无效检查 area 参数是否为有效的地区编码
3001服务暂时不可用稍后重试,可能是平台维护或临时故障
4001账号被限制检查账号状态,是否有违规行为导致限制

2. 性能优化建议

  • 按需获取数据:根据业务需求设置 needComment、needStore 等参数,减少不必要数据

  • 合理设置缓存:普通商品信息可设置 10-15 分钟缓存,促销信息建议 5 分钟

  • 批量处理控制:批量获取时控制并发数,建议不超过 2 个并发

  • 地区选择:根据业务覆盖区域选择合适的 area 参数,避免无效数据

  • 异常重试机制:实现失败自动重试逻辑,设置最大重试次数(如 3 次)

  • 增量更新:定期更新时只获取有变化的商品数据

  • 规格过滤:如需特定规格数据,使用 cargoNo 参数直接获取,减少数据传输

五、应用场景与扩展建议

典型应用场景

  • 价格监控:实时监控商品价格变化,特别是促销活动期间

  • 竞品分析:对比同类商品的价格、促销和销售情况

  • 库存管理:监控商品库存变化,及时调整采购策略

  • 服务质量评估:分析商品的售后服务和用户评价

  • 促销效果分析:评估不同促销方式对商品销售的影响

  • 品牌分析:研究不同品牌在平台上的表现和用户反馈

扩展建议

  • 价格预警:当商品价格低于或高于设定阈值时发送提醒

  • 库存预警:当商品库存低于安全值时自动提醒补货

  • 多平台比价:与其他电商平台数据对比,寻找最优价格

  • 评价情感分析:对用户评价进行情感分析,提取关键反馈

  • 智能推荐:基于商品特征和用户偏好推荐相似商品

  • 促销策略优化:分析促销效果,优化促销策略和选品

  • 销售预测:基于历史销售数据预测未来销售趋势

通过item_get接口获取的苏宁商品详情数据,能够帮助电商从业者深入了解平台商品的价格策略、促销模式和销售表现。在实际应用中,需严格遵守苏宁平台的数据使用规范,结合平台的零售特性和用户消费习惯,制定针对性的营销策略和采购计划,同时注重数据的时效性和准确性,以提升决策质量。


群贤毕至

访客