Lazada 商品详情数据是了解产品信息、价格策略、库存状态和销售表现的核心来源。item_get接口提供了获取指定商品完整详情的能力,包含基本信息、价格、规格、库存、促销活动、卖家信息等多维度数据,为跨境电商卖家提供产品研究、价格监控和市场策略制定的关键依据。
一、item_get 接口核心特性分析
1. 接口定位与核心价值
2. 接口权限与调用限制
3. 核心参数解析
必选参数
可选参数
二、签名生成与返回数据结构
1. 签名生成逻辑
2. 返回数据结构解析
三、Python 实现方案
import requests
import time
import hmac
import hashlib
import base64
import json
import logging
import pandas as pd
import matplotlib.pyplot as plt
import re
from datetime import datetime
from typing import Dict, Optional, List, Tuple
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
# 配置中文显示
plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"]
plt.rcParams["axes.unicode_minus"] = False
class LazadaItemDetail:
"""Lazada item_get接口封装类,用于获取和分析商品详情数据"""
def __init__(self, app_key: str, app_secret: str):
"""
初始化Lazada API客户端
:param app_key: 应用的app_key
:param app_secret: 应用的app_secret
"""
self.app_key = app_key
self.app_secret = app_secret
self.api_url = "https://api.lazada.com/rest"
# 频率控制
self.rate_limit = 120 # 默认基础权限,高级权限可修改为600
self.call_timestamps = [] # 存储调用时间戳(秒级)
# 支持的站点代码
self.supported_countries = ["SG", "MY", "TH", "ID", "PH", "VN"]
# 货币代码映射
self.currency_codes = {
"SG": "SGD",
"MY": "MYR",
"TH": "THB",
"ID": "IDR",
"PH": "PHP",
"VN": "VND"
}
def set_rate_limit(self, limit: int) -> None:
"""设置调用频率限制(次/小时)"""
if 120 <= limit <= 600:
self.rate_limit = limit
logging.info(f"已设置调用频率限制为 {limit} 次/小时")
else:
logging.warning("频率限制必须在120-600之间,未修改")
def _generate_sign(self, params: Dict) -> str:
"""生成签名(HMAC-SHA256算法)"""
# 1. 按参数名ASCII升序排序
sorted_params = sorted(params.items(), key=lambda x: x[0])
# 2. 拼接为"key=value&key=value"格式
param_str = "&".join([f"{k}={v}" for k, v in sorted_params])
# 3. HMAC-SHA256加密
hmac_code = hmac.new(
self.app_secret.encode('utf-8'),
param_str.encode('utf-8'),
hashlib.sha256
).digest()
# 4. Base64编码
return base64.b64encode(hmac_code).decode('utf-8')
def _check_rate_limit(self) -> None:
"""检查并控制调用频率"""
current_time = time.time() # 秒级
# 保留1小时内的调用记录
self.call_timestamps = [t for t in self.call_timestamps if current_time - t < 3600]
# 若超过限制,计算需要等待的时间
if len(self.call_timestamps) >= self.rate_limit:
oldest_time = self.call_timestamps[0]
sleep_time = (3600 - (current_time - oldest_time)) + 1 # 额外加1秒保险
logging.warning(f"调用频率超限,等待 {sleep_time:.1f} 秒")
time.sleep(sleep_time)
# 再次清理过期记录
self.call_timestamps = [t for t in self.call_timestamps if time.time() - t < 3600]
# 记录本次调用时间
self.call_timestamps.append(current_time)
def get_item_detail(self, item_id: str, country: str,
include_seller: bool = True,
include_sku: bool = True,
include_promotion: bool = True,
fields: Optional[str] = None) -> Optional[Dict]:
"""
获取商品详情数据
:param item_id: 商品ID
:param country: 站点代码
:param include_seller: 是否包含卖家信息
:param include_sku: 是否包含SKU详情
:param include_promotion: 是否包含促销信息
:param fields: 指定返回字段,逗号分隔
:return: 商品详情数据
"""
# 验证站点代码
if country not in self.supported_countries:
logging.error(f"不支持的站点代码: {country},支持的站点: {', '.join(self.supported_countries)}")
return None
# 构建基础参数
base_params = {
"app_key": self.app_key,
"method": "item.get",
"timestamp": str(int(time.time())), # 秒级时间戳
"format": "json",
"v": "2.0",
"item_id": item_id,
"country": country,
"include_seller": "true" if include_seller else "false",
"include_sku": "true" if include_sku else "false",
"include_promotion": "true" if include_promotion else "false"
}
# 添加指定字段参数
if fields:
base_params["fields"] = fields
# 生成签名
sign = self._generate_sign(base_params)
base_params["sign"] = sign
# 检查频率限制
self._check_rate_limit()
try:
# 发送请求
response = requests.get(self.api_url, params=base_params, timeout=15)
response.raise_for_status()
# 解析响应
result = response.json()
# 处理错误
if "code" in result and result["code"] != 0:
logging.error(f"API调用错误: {result.get('message')} (错误码: {result.get('code')})")
return None
# 提取结果
item_data = result.get("data", {})
if not item_data:
logging.warning("未获取到商品详情数据")
return None
logging.info(f"成功获取 {country} 站点商品 {item_id} 的详情数据")
return item_data
except requests.exceptions.RequestException as e:
logging.error(f"请求异常: {str(e)}")
return None
except json.JSONDecodeError:
logging.error(f"响应解析失败: {response.text[:200]}...")
return None
def batch_get_item_details(self, item_ids: List[str], country: str,
include_seller: bool = True,
include_sku: bool = True,
include_promotion: bool = True) -> Tuple[List[Dict], int]:
"""
批量获取多个商品的详情数据
:param item_ids: 商品ID列表
:param country: 站点代码
:param include_seller: 是否包含卖家信息
:param include_sku: 是否包含SKU详情
:param include_promotion: 是否包含促销信息
:return: 商品详情列表和成功获取的数量
"""
all_items = []
success_count = 0
for item_id in item_ids:
logging.info(f"正在获取商品 {item_id} 的详情...")
item_data = self.get_item_detail(
item_id=item_id,
country=country,
include_seller=include_seller,
include_sku=include_sku,
include_promotion=include_promotion
)
if item_data:
all_items.append(item_data)
success_count += 1
# 避免连续调用过于频繁
time.sleep(0.5)
logging.info(f"批量获取完成,共请求 {len(item_ids)} 个商品,成功获取 {success_count} 个")
return all_items, success_count
def analyze_item(self, item_data: Dict) -> Dict:
"""分析单个商品数据"""
if not item_data:
return {"error": "没有商品数据可分析"}
# 1. 价格分析
price_info = {
"current_price": float(item_data.get("price", 0)),
"original_price": float(item_data.get("original_price", 0)),
"currency": self.currency_codes.get(item_data.get("country", ""), "Unknown"),
"discount": 0
}
# 计算折扣
if price_info["original_price"] > 0:
price_info["discount"] = round(1 - price_info["current_price"] / price_info["original_price"], 2) * 100
# 2. 库存分析
stock_info = {
"total_stock": int(item_data.get("stock_total", 0)),
"available_stock": int(item_data.get("stock_available", 0)),
"stock_status": "有货" if int(item_data.get("stock_available", 0)) > 0 else "无货",
"sku_count": len(item_data.get("skus", []))
}
# 3. 销售表现分析
sales_info = {
"sales_count": int(item_data.get("sales_count", 0)),
"rating_score": float(item_data.get("rating_score", 0)),
"review_count": int(item_data.get("review_count", 0)),
"rating_distribution": item_data.get("rating_distribution", {})
}
# 4. SKU分析(价格范围等)
sku_info = {
"min_sku_price": None,
"max_sku_price": None,
"price_range": None,
"has_variation": len(item_data.get("skus", [])) > 1
}
if item_data.get("skus"):
sku_prices = []
for sku in item_data["skus"]:
try:
sku_prices.append(float(sku.get("price", 0)))
except (ValueError, TypeError):
continue
if sku_prices:
sku_info["min_sku_price"] = min(sku_prices)
sku_info["max_sku_price"] = max(sku_prices)
sku_info["price_range"] = round(sku_info["max_sku_price"] - sku_info["min_sku_price"], 2)
# 5. 媒体资源分析
media_info = {
"image_count": len(item_data.get("images", [])),
"has_video": len(item_data.get("videos", [])) > 0,
"video_count": len(item_data.get("videos", []))
}
# 6. 促销分析
promotion_info = {
"has_promotion": len(item_data.get("promotions", [])) > 0,
"promotion_count": len(item_data.get("promotions", []))
}
# 7. 卖家分析
seller_info = {}
if "seller_info" in item_data:
seller = item_data["seller_info"]
seller_info = {
"seller_id": seller.get("seller_id"),
"seller_name": seller.get("seller_name"),
"rating": float(seller.get("rating", 0)),
"feedback_count": int(seller.get("feedback_count", 0)),
"joined_date": seller.get("joined_date"),
"is_official": seller.get("is_official", False)
}
return {
"item_id": item_data.get("item_id"),
"title": item_data.get("title"),
"country": item_data.get("country"),
"brand": item_data.get("brand_name"),
"category_id": item_data.get("category_id"),
"price_analysis": price_info,
"stock_analysis": stock_info,
"sales_analysis": sales_info,
"sku_analysis": sku_info,
"media_analysis": media_info,
"promotion_analysis": promotion_info,
"seller_analysis": seller_info,
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
def compare_items(self, items_data: List[Dict]) -> Dict:
"""对比多个商品数据"""
if len(items_data) < 2:
return {"error": "至少需要2个商品进行对比"}
comparison = {
"total_items": len(items_data),
"price_comparison": [],
"sales_comparison": [],
"rating_comparison": [],
"stock_comparison": [],
"sku_comparison": []
}
# 提取各商品关键指标用于对比
for item in items_data:
item_id = item.get("item_id")
title = item.get("title", "")[:30] # 截断长标题
# 价格对比
comparison["price_comparison"].append({
"item_id": item_id,
"title": title,
"current_price": float(item.get("price", 0)),
"original_price": float(item.get("original_price", 0)),
"discount": round((1 - float(item.get("price", 0))/float(item.get("original_price", 1))) * 100, 1)
if float(item.get("original_price", 0)) > 0 else 0
})
# 销量对比
comparison["sales_comparison"].append({
"item_id": item_id,
"title": title,
"sales_count": int(item.get("sales_count", 0))
})
# 评分对比
comparison["rating_comparison"].append({
"item_id": item_id,
"title": title,
"rating_score": float(item.get("rating_score", 0)),
"review_count": int(item.get("review_count", 0))
})
# 库存对比
comparison["stock_comparison"].append({
"item_id": item_id,
"title": title,
"available_stock": int(item.get("stock_available", 0))
})
# SKU对比
sku_count = len(item.get("skus", []))
comparison["sku_comparison"].append({
"item_id": item_id,
"title": title,
"sku_count": sku_count
})
# 排序各对比项以便分析
comparison["price_comparison"].sort(key=lambda x: x["current_price"])
comparison["sales_comparison"].sort(key=lambda x: x["sales_count"], reverse=True)
comparison["rating_comparison"].sort(key=lambda x: x["rating_score"], reverse=True)
return comparison
def visualize_comparison(self, comparison: Dict, output_dir: str = ".") -> None:
"""可视化多个商品的对比结果"""
if "error" in comparison:
logging.warning(comparison["error"])
return
# 1. 价格对比条形图
if comparison["price_comparison"]:
plt.figure(figsize=(12, 6))
items = [item["title"] for item in comparison["price_comparison"]]
current_prices = [item["current_price"] for item in comparison["price_comparison"]]
original_prices = [item["original_price"] for item in comparison["price_comparison"]]
x = np.arange(len(items))
width = 0.35
plt.bar(x - width/2, current_prices, width, label='当前价格')
plt.bar(x + width/2, original_prices, width, label='原价')
plt.xlabel('商品')
plt.ylabel('价格')
plt.title('商品价格对比')
plt.xticks(x, items, rotation=45)
plt.legend()
plt.tight_layout()
plt.savefig(f"{output_dir}/price_comparison.png")
plt.close()
logging.info(f"价格对比图表已保存至 {output_dir}/price_comparison.png")
# 2. 销量对比条形图
if comparison["sales_comparison"]:
plt.figure(figsize=(12, 6))
items = [item["title"] for item in comparison["sales_comparison"]]
sales = [item["sales_count"] for item in comparison["sales_comparison"]]
plt.bar(items, sales, color='orange')
plt.xlabel('商品')
plt.ylabel('销量')
plt.title('商品销量对比')
plt.xticks(rotation=45)
for i, v in enumerate(sales):
plt.text(i, v + 5, str(v), ha='center')
plt.tight_layout()
plt.savefig(f"{output_dir}/sales_comparison.png")
plt.close()
logging.info(f"销量对比图表已保存至 {output_dir}/sales_comparison.png")
# 3. 评分对比条形图
if comparison["rating_comparison"]:
plt.figure(figsize=(12, 6))
items = [item["title"] for item in comparison["rating_comparison"]]
ratings = [item["rating_score"] for item in comparison["rating_comparison"]]
plt.bar(items, ratings, color='green')
plt.axhline(y=4.0, color='r', linestyle='--', label='4.0分基准线')
plt.xlabel('商品')
plt.ylabel('评分 (满分5分)')
plt.title('商品评分对比')
plt.xticks(rotation=45)
plt.ylim(0, 5.5)
plt.legend()
for i, v in enumerate(ratings):
plt.text(i, v + 0.1, str(v), ha='center')
plt.tight_layout()
plt.savefig(f"{output_dir}/rating_comparison.png")
plt.close()
logging.info(f"评分对比图表已保存至 {output_dir}/rating_comparison.png")
def export_to_excel(self, items_data: List[Dict], analyses: List[Dict], comparison: Optional[Dict],
filename: str) -> None:
"""导出商品数据到Excel"""
if not items_data and not analyses:
logging.warning("没有数据可导出")
return
try:
with pd.ExcelWriter(filename) as writer:
# 商品基本信息
if items_data:
basic_info = []
for item in items_data:
info = {
"商品ID": item.get("item_id"),
"标题": item.get("title"),
"国家站点": item.get("country"),
"品牌": item.get("brand_name"),
"类目ID": item.get("category_id"),
"当前价格": item.get("price"),
"原价": item.get("original_price"),
"折扣(%)": round((1 - float(item.get("price", 0))/float(item.get("original_price", 1))) * 100, 1)
if float(item.get("original_price", 0)) > 0 else 0,
"销量": item.get("sales_count"),
"评分": item.get("rating_score"),
"评论数": item.get("review_count"),
"可用库存": item.get("stock_available"),
"SKU数量": len(item.get("skus", [])),
"图片数量": len(item.get("images", [])),
"是否有视频": "是" if len(item.get("videos", [])) > 0 else "否"
}
basic_info.append(info)
df_basic = pd.DataFrame(basic_info)
df_basic.to_excel(writer, sheet_name='商品基本信息', index=False)
# SKU详情
for i, item in enumerate(items_data):
if item.get("skus"):
sku_details = []
for sku in item.get("skus", []):
details = {
"SKU ID": sku.get("sku_id"),
"属性组合": ", ".join([f"{k}:{v}" for k, v in sku.get("attributes", {}).items()]),
"价格": sku.get("price"),
"库存": sku.get("stock"),
"图片": sku.get("image")
}
sku_details.append(details)
df_sku = pd.DataFrame(sku_details)
df_sku.to_excel(writer, sheet_name=f'SKU详情_{item.get("item_id")[:6]}', index=False)
# 分析结果
if analyses:
analysis_summary = []
for analysis in analyses:
summary = {
"商品ID": analysis.get("item_id"),
"标题": analysis.get("title"),
"当前价格": analysis["price_analysis"]["current_price"],
"原价": analysis["price_analysis"]["original_price"],
"折扣(%)": analysis["price_analysis"]["discount"],
"可用库存": analysis["stock_analysis"]["available_stock"],
"销量": analysis["sales_analysis"]["sales_count"],
"评分": analysis["sales_analysis"]["rating_score"],
"评论数": analysis["sales_analysis"]["review_count"],
"SKU数量": analysis["sku_analysis"]["sku_count"],
"图片数量": analysis["media_analysis"]["image_count"],
"是否有视频": "是" if analysis["media_analysis"]["has_video"] else "否",
"是否有促销": "是" if analysis["promotion_analysis"]["has_promotion"] else "否"
}
analysis_summary.append(summary)
df_analysis = pd.DataFrame(analysis_summary)
df_analysis.to_excel(writer, sheet_name='分析摘要', index=False)
# 对比结果
if comparison and "error" not in comparison:
df_price = pd.DataFrame(comparison["price_comparison"])
df_price.to_excel(writer, sheet_name='价格对比', index=False)
df_sales = pd.DataFrame(comparison["sales_comparison"])
df_sales.to_excel(writer, sheet_name='销量对比', index=False)
logging.info(f"数据已导出至 {filename}")
except Exception as e:
logging.error(f"导出Excel失败: {e}")
# 示例调用
if __name__ == "__main__":
# 替换为实际的参数(从Lazada开放平台获取)
APP_KEY = "your_app_key"
APP_SECRET = "your_app_secret"
ITEM_ID = "123456789" # 商品ID示例
COUNTRY = "SG" # 新加坡站点
# 多个商品ID用于批量获取和对比
MULTIPLE_ITEM_IDS = ["123456789", "987654321", "112233445"]
# 初始化API客户端
lazada_item = LazadaItemDetail(APP_KEY, APP_SECRET)
# 若为高级权限,设置更高的频率限制
# lazada_item.set_rate_limit(600)
# 1. 获取单个商品详情
print("=== 获取单个商品详情 ===")
item_detail = lazada_item.get_item_detail(
item_id=ITEM_ID,
country=COUNTRY,
include_seller=True,
include_sku=True,
include_promotion=True
)
if item_detail:
print(f"商品ID: {item_detail.get('item_id')}")
print(f"标题: {item_detail.get('title')[:50]}...")
print(f"当前价格: {item_detail.get('price')} {lazada_item.currency_codes[COUNTRY]}")
print(f"原价: {item_detail.get('original_price')} {lazada_item.currency_codes[COUNTRY]}")
print(f"销量: {item_detail.get('sales_count')}")
print(f"评分: {item_detail.get('rating_score')} ({item_detail.get('review_count')}条评论)")
print(f"可用库存: {item_detail.get('stock_available')}")
print(f"SKU数量: {len(item_detail.get('skus', []))}")
# 2. 分析商品详情
print("\n=== 商品详情分析 ===")
if item_detail:
analysis = lazada_item.analyze_item(item_detail)
print("价格分析:")
print(f" 当前价格: {analysis['price_analysis']['current_price']} {analysis['price_analysis']['currency']}")
print(f" 原价: {analysis['price_analysis']['original_price']} {analysis['price_analysis']['currency']}")
print(f" 折扣: {analysis['price_analysis']['discount']}%")
print("\n库存分析:")
print(f" 可用库存: {analysis['stock_analysis']['available_stock']}")
print(f" 库存状态: {analysis['stock_analysis']['stock_status']}")
print(f" SKU数量: {analysis['stock_analysis']['sku_count']}")
print("\n销售分析:")
print(f" 销量: {analysis['sales_analysis']['sales_count']}")
print(f" 评分: {analysis['sales_analysis']['rating_score']}")
print(f" 评论数: {analysis['sales_analysis']['review_count']}")
if analysis["seller_analysis"]:
print("\n卖家分析:")
print(f" 卖家名称: {analysis['seller_analysis']['seller_name']}")
print(f" 卖家评分: {analysis['seller_analysis']['rating']}")
# 3. 批量获取多个商品详情并对比
print("\n=== 批量获取与商品对比 ===")
items, success_count = lazada_item.batch_get_item_details(
item_ids=MULTIPLE_ITEM_IDS,
country=COUNTRY
)
if items and len(items) >= 2:
# 分析每个商品
analyses = [lazada_item.analyze_item(item) for item in items]
# 对比商品
comparison = lazada_item.compare_items(items)
print("\n价格对比 (从低到高):")
for i, item in enumerate(comparison["price_comparison"][:3], 1):
print(f"{i}. {item['title']} - {item['current_price']} {lazada_item.currency_codes[COUNTRY]}")
print("\n销量对比 (从高到低):")
for i, item in enumerate(comparison["sales_comparison"][:3], 1):
print(f"{i}. {item['title']} - 销量: {item['sales_count']}")
print("\n评分对比 (从高到低):")
for i, item in enumerate(comparison["rating_comparison"][:3], 1):
print(f"{i}. {item['title']} - 评分: {item['rating_score']}")
# 4. 可视化对比结果
lazada_item.visualize_comparison(comparison)
# 5. 导出数据到Excel
lazada_item.export_to_excel(items, analyses, comparison, "Lazada商品详情分析.xlsx")
elif len(items) == 1:
print("获取的商品数量不足,无法进行对比分析")
else:
print("未获取到足够的商品数据")