Lazada 作为东南亚领先的电商平台,商品评论数据是了解当地消费者偏好、产品优缺点及市场反馈的重要途径。item_review接口提供了获取指定商品评论列表的能力,包含评分、评论内容、用户信息、购买属性等关键数据,为跨境电商卖家提供产品优化、市场策略调整的决策依据。
一、item_review 接口核心特性分析
1. 接口定位与核心价值
2. 接口权限与调用限制
3. 核心参数解析
必选参数
可选参数
二、签名生成与返回数据结构
1. 签名生成逻辑
2. 返回数据结构解析
三、Python 实现方案
import requests
import time
import hmac
import hashlib
import base64
import json
import logging
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import re
from datetime import datetime
from collections import defaultdict
from typing import Dict, List, Optional, Tuple
from textblob import TextBlob # 用于情感分析
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
# 配置中文显示
plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"]
plt.rcParams["axes.unicode_minus"] = False
class LazadaItemReviews:
"""Lazada item_review接口封装类,用于获取和分析商品评论数据"""
def __init__(self, app_key: str, app_secret: str):
"""
初始化Lazada API客户端
:param app_key: 应用的app_key
:param app_secret: 应用的app_secret
"""
self.app_key = app_key
self.app_secret = app_secret
self.api_url = "https://api.lazada.com/rest"
# 频率控制
self.rate_limit = 60 # 默认基础权限,高级权限可修改为300
self.call_timestamps = [] # 存储调用时间戳(秒级)
# 支持的站点代码
self.supported_countries = ["SG", "MY", "TH", "ID", "PH", "VN"]
def set_rate_limit(self, limit: int) -> None:
"""设置调用频率限制(次/小时)"""
if 60 <= limit <= 300:
self.rate_limit = limit
logging.info(f"已设置调用频率限制为 {limit} 次/小时")
else:
logging.warning("频率限制必须在60-300之间,未修改")
def _generate_sign(self, params: Dict) -> str:
"""生成签名(HMAC-SHA256算法)"""
# 1. 按参数名ASCII升序排序
sorted_params = sorted(params.items(), key=lambda x: x[0])
# 2. 拼接为"key=value&key=value"格式
param_str = "&".join([f"{k}={v}" for k, v in sorted_params])
# 3. HMAC-SHA256加密
hmac_code = hmac.new(
self.app_secret.encode('utf-8'),
param_str.encode('utf-8'),
hashlib.sha256
).digest()
# 4. Base64编码
return base64.b64encode(hmac_code).decode('utf-8')
def _check_rate_limit(self) -> None:
"""检查并控制调用频率"""
current_time = time.time() # 秒级
# 保留1小时内的调用记录
self.call_timestamps = [t for t in self.call_timestamps if current_time - t < 3600]
# 若超过限制,计算需要等待的时间
if len(self.call_timestamps) >= self.rate_limit:
oldest_time = self.call_timestamps[0]
sleep_time = (3600 - (current_time - oldest_time)) + 1 # 额外加1秒保险
logging.warning(f"调用频率超限,等待 {sleep_time:.1f} 秒")
time.sleep(sleep_time)
# 再次清理过期记录
self.call_timestamps = [t for t in self.call_timestamps if time.time() - t < 3600]
# 记录本次调用时间
self.call_timestamps.append(current_time)
def get_reviews(self, item_id: str, country: str,
page: int = 1, page_size: int = 20,
filters: Optional[Dict] = None) -> Optional[Dict]:
"""
获取商品评论数据
:param item_id: 商品ID
:param country: 站点代码
:param page: 页码
:param page_size: 每页数量
:param filters: 筛选参数
:return: 评论数据
"""
# 验证站点代码
if country not in self.supported_countries:
logging.error(f"不支持的站点代码: {country},支持的站点: {', '.join(self.supported_countries)}")
return None
# 构建基础参数
base_params = {
"app_key": self.app_key,
"method": "review.get",
"timestamp": str(int(time.time())), # 秒级时间戳
"format": "json",
"v": "2.0",
"item_id": item_id,
"country": country,
"page": str(page),
"page_size": str(page_size)
}
# 合并筛选参数
if filters and isinstance(filters, Dict):
# 过滤空值参数
valid_filters = {k: v for k, v in filters.items() if v is not None}
# 转换为字符串类型
for k, v in valid_filters.items():
base_params[k] = str(v).lower() if isinstance(v, bool) else str(v)
# 生成签名
sign = self._generate_sign(base_params)
base_params["sign"] = sign
# 检查频率限制
self._check_rate_limit()
try:
# 发送请求
response = requests.get(self.api_url, params=base_params, timeout=15)
response.raise_for_status()
# 解析响应
result = response.json()
# 处理错误
if "code" in result and result["code"] != 0:
logging.error(f"API调用错误: {result.get('message')} (错误码: {result.get('code')})")
return None
# 提取结果
reviews_data = result.get("data", {})
if not reviews_data.get("reviews"):
logging.warning("未获取到评论数据")
return None
logging.info(f"成功获取 {country} 站点商品 {item_id} 的第 {page} 页评论")
return reviews_data
except requests.exceptions.RequestException as e:
logging.error(f"请求异常: {str(e)}")
return None
except json.JSONDecodeError:
logging.error(f"响应解析失败: {response.text[:200]}...")
return None
def batch_get_reviews(self, item_id: str, country: str,
max_pages: int = 5, page_size: int = 20,
filters: Optional[Dict] = None) -> Tuple[List[Dict], Dict]:
"""
批量获取多页评论数据
:param item_id: 商品ID
:param country: 站点代码
:param max_pages: 最大页数
:param page_size: 每页数量
:param filters: 筛选参数
:return: 评论列表和元信息
"""
all_reviews = []
meta_info = {
"item_id": item_id,
"country": country,
"total_reviews": 0,
"total_pages": 0,
"filters": filters,
"fetch_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
page = 1
while page <= max_pages:
logging.info(f"正在获取第 {page}/{max_pages} 页评论...")
result = self.get_reviews(item_id, country, page, page_size, filters)
if not result:
break
# 提取评论数据
reviews = result.get("reviews", [])
if not reviews:
logging.info("当前页无评论数据,停止获取")
break
all_reviews.extend(reviews)
# 保存元信息(第一页)
if page == 1:
meta_info["total_reviews"] = result.get("total_reviews", 0)
meta_info["total_pages"] = min(max_pages, (meta_info["total_reviews"] + page_size - 1) // page_size)
meta_info["average_rating"] = result.get("average_rating", 0)
page += 1
logging.info(f"批量获取完成,共获取 {len(all_reviews)} 条评论数据")
return all_reviews, meta_info
def analyze_reviews(self, reviews: List[Dict]) -> Dict:
"""分析评论数据"""
if not reviews:
return {"error": "没有评论数据可分析"}
# 1. 评分分布分析
rating_counts = defaultdict(int)
ratings = []
for review in reviews:
try:
rating = int(review.get("rating", 0))
if 1 <= rating <= 5:
rating_counts[rating] += 1
ratings.append(rating)
else:
rating_counts["无效"] += 1
except (ValueError, TypeError):
rating_counts["无效"] += 1
# 计算平均评分
avg_rating = round(sum(ratings)/len(ratings), 1) if ratings else 0
# 2. 时间分布分析
time_counts = defaultdict(int)
for review in reviews:
try:
review_time = review.get("review_time", "")
if review_time:
# 转换为年月格式
date_obj = datetime.strptime(review_time, "%Y-%m-%d %H:%M:%S")
year_month = date_obj.strftime("%Y-%m")
time_counts[year_month] += 1
else:
time_counts["未知"] += 1
except (ValueError, TypeError):
time_counts["未知"] += 1
# 3. 购买属性分析(颜色、尺寸等)
attr_analysis = defaultdict(lambda: defaultdict(int))
for review in reviews:
purchase_attr = review.get("purchase_attr", {})
if purchase_attr:
for attr_name, attr_value in purchase_attr.items():
attr_analysis[attr_name][attr_value] += 1
# 4. 情感分析
sentiment_counts = defaultdict(int)
sentiment_scores = []
for review in reviews:
content = review.get("review_content", "")
if content:
# 使用TextBlob进行情感分析
# 注意:对于非英文评论,可能需要先翻译或使用其他工具
blob = TextBlob(content)
sentiment = blob.sentiment.polarity
sentiment_scores.append(sentiment)
if sentiment > 0.1:
sentiment_counts["正面"] += 1
elif sentiment < -0.1:
sentiment_counts["负面"] += 1
else:
sentiment_counts["中性"] += 1
else:
sentiment_counts["无内容"] += 1
avg_sentiment = round(sum(sentiment_scores)/len(sentiment_scores), 3) if sentiment_scores else 0
# 5. 有用性分析
helpful_analysis = defaultdict(int)
for review in reviews:
try:
helpful = int(review.get("helpful_votes", 0))
if helpful == 0:
helpful_analysis["无投票"] += 1
elif helpful <= 5:
helpful_analysis["1-5票"] += 1
elif helpful <= 20:
helpful_analysis["6-20票"] += 1
else:
helpful_analysis["20票以上"] += 1
except (ValueError, TypeError):
helpful_analysis["数据异常"] += 1
# 6. 带图评论比例
media_counts = defaultdict(int)
for review in reviews:
has_image = len(review.get("images", [])) > 0
has_video = len(review.get("videos", [])) > 0
if has_image and has_video:
media_counts["图文+视频"] += 1
elif has_image:
media_counts["仅图片"] += 1
elif has_video:
media_counts["仅视频"] += 1
else:
media_counts["纯文字"] += 1
# 7. 商家回复率
reply_counts = defaultdict(int)
for review in reviews:
if review.get("reply"):
reply_counts["已回复"] += 1
else:
reply_counts["未回复"] += 1
return {
"total_reviews": len(reviews),
"rating_analysis": {
"distribution": dict(rating_counts),
"average": avg_rating
},
"time_analysis": dict(time_counts),
"attribute_analysis": {k: dict(v) for k, v in attr_analysis.items()},
"sentiment_analysis": {
"distribution": dict(sentiment_counts),
"average_score": avg_sentiment
},
"helpful_analysis": dict(helpful_analysis),
"media_analysis": dict(media_counts),
"reply_analysis": dict(reply_counts)
}
def extract_keywords(self, reviews: List[Dict], rating_filter: Optional[int] = None, top_n: int = 10) -> List[Tuple]:
"""
从评论中提取关键词
:param reviews: 评论列表
:param rating_filter: 按评分筛选(1-5,None表示全部)
:param top_n: 提取前N个关键词
:return: 关键词及其出现次数
"""
if not reviews:
return []
# 过滤评论
filtered_reviews = []
for review in reviews:
if rating_filter is None or int(review.get("rating", 0)) == rating_filter:
content = review.get("review_content", "")
if content:
# 简单预处理:移除特殊字符,转为小写
content = re.sub(r'[^\w\s]', ' ', content).lower()
filtered_reviews.append(content)
# 提取关键词(简单实现,实际应用可能需要更复杂的NLP处理)
word_counts = defaultdict(int)
stop_words = {"the", "and", "is", "in", "to", "i", "this", "for", "with", "it", "on", "a", "an", "was", "were"}
for content in filtered_reviews:
words = content.split()
for word in words:
if len(word) > 2 and word not in stop_words:
word_counts[word] += 1
# 返回前N个关键词
return sorted(word_counts.items(), key=lambda x: x[1], reverse=True)[:top_n]
def visualize_analysis(self, analysis: Dict, item_id: str, country: str, output_dir: str = ".") -> None:
"""可视化分析结果"""
# 1. 评分分布条形图
if "rating_analysis" in analysis and analysis["rating_analysis"]["distribution"]:
plt.figure(figsize=(10, 6))
ratings = sorted(analysis["rating_analysis"]["distribution"].keys(), key=lambda x: str(x))
counts = [analysis["rating_analysis"]["distribution"][r] for r in ratings]
plt.bar(ratings, counts, color='lightblue')
plt.title(f'商品 {item_id} ({country}) 评分分布')
plt.xlabel('评分 (星)')
plt.ylabel('评论数量')
plt.axhline(y=sum(counts)/len(counts), color='r', linestyle='--',
label=f'平均分布: {sum(counts)/len(counts):.1f}')
for i, v in enumerate(counts):
plt.text(i, v + 0.5, str(v), ha='center')
plt.legend()
plt.tight_layout()
plt.savefig(f"{output_dir}/rating_distribution.png")
plt.close()
logging.info(f"评分分布图表已保存至 {output_dir}/rating_distribution.png")
# 2. 时间趋势线图
if "time_analysis" in analysis and analysis["time_analysis"]:
# 过滤"未知"并按时间排序
time_data = {k: v for k, v in analysis["time_analysis"].items() if k != "未知"}
if time_data:
plt.figure(figsize=(12, 6))
sorted_dates = sorted(time_data.keys())
counts = [time_data[d] for d in sorted_dates]
plt.plot(sorted_dates, counts, marker='o', color='orange')
plt.title(f'商品 {item_id} ({country}) 评论时间趋势')
plt.xlabel('时间 (年月)')
plt.ylabel('评论数量')
plt.xticks(rotation=45)
for i, v in enumerate(counts):
plt.text(i, v + 0.5, str(v), ha='center')
plt.tight_layout()
plt.savefig(f"{output_dir}/review_trend.png")
plt.close()
logging.info(f"评论时间趋势图表已保存至 {output_dir}/review_trend.png")
# 3. 情感分析饼图
if "sentiment_analysis" in analysis and analysis["sentiment_analysis"]["distribution"]:
plt.figure(figsize=(10, 8))
sentiment_data = analysis["sentiment_analysis"]["distribution"]
labels, sizes = zip(*sentiment_data.items())
plt.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=90)
plt.title(f'商品 {item_id} ({country}) 评论情感分布')
plt.axis('equal')
plt.tight_layout()
plt.savefig(f"{output_dir}/sentiment_distribution.png")
plt.close()
logging.info(f"情感分布图表已保存至 {output_dir}/sentiment_distribution.png")
# 4. 购买属性分析(如颜色、尺寸)
if "attribute_analysis" in analysis and analysis["attribute_analysis"]:
for attr_name, attr_values in analysis["attribute_analysis"].items():
plt.figure(figsize=(10, 6))
values = list(attr_values.keys())
counts = list(attr_values.values())
plt.bar(values, counts, color='lightgreen')
plt.title(f'商品 {item_id} ({country}) {attr_name} 分布')
plt.xlabel(attr_name)
plt.ylabel('评论数量')
for i, v in enumerate(counts):
plt.text(i, v + 0.5, str(v), ha='center')
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig(f"{output_dir}/{attr_name}_distribution.png")
plt.close()
logging.info(f"{attr_name}分布图表已保存至 {output_dir}/{attr_name}_distribution.png")
def export_to_excel(self, reviews: List[Dict], analysis: Dict, meta_info: Dict, filename: str) -> None:
"""导出评论数据和分析结果到Excel"""
if not reviews and not analysis:
logging.warning("没有数据可导出")
return
try:
with pd.ExcelWriter(filename) as writer:
# 元信息
pd.DataFrame([meta_info]).to_excel(writer, sheet_name='评论信息', index=False)
# 评论数据
if reviews:
filtered_reviews = []
for review in reviews:
# 提取关键信息
filtered = {
"评论ID": review.get("review_id"),
"用户ID": review.get("user_id"),
"用户名": review.get("user_name"),
"评分": review.get("rating"),
"评论内容": review.get("review_content"),
"评论时间": review.get("review_time"),
"有用投票数": review.get("helpful_votes"),
"购买属性": json.dumps(review.get("purchase_attr", {}), ensure_ascii=False),
"带图数量": len(review.get("images", [])),
"带视频数量": len(review.get("videos", [])),
"商家回复": "有" if review.get("reply") else "无",
"回复内容": review.get("reply", ""),
"回复时间": review.get("reply_time", ""),
"是否验证购买": review.get("verified_purchase", False)
}
filtered_reviews.append(filtered)
df_reviews = pd.DataFrame(filtered_reviews)
df_reviews.to_excel(writer, sheet_name='评论详情', index=False)
# 分析结果
if analysis and "error" not in analysis:
# 评分分析
if "rating_analysis" in analysis:
df_rating = pd.DataFrame(list(analysis["rating_analysis"]["distribution"].items()),
columns=['评分', '数量'])
df_rating.to_excel(writer, sheet_name='评分分析', index=False)
# 情感分析
if "sentiment_analysis" in analysis:
df_sentiment = pd.DataFrame(list(analysis["sentiment_analysis"]["distribution"].items()),
columns=['情感类型', '数量'])
df_sentiment.to_excel(writer, sheet_name='情感分析', index=False)
logging.info(f"数据已导出至 {filename}")
except Exception as e:
logging.error(f"导出Excel失败: {e}")
# 示例调用
if __name__ == "__main__":
# 替换为实际的参数(从Lazada开放平台获取)
APP_KEY = "your_app_key"
APP_SECRET = "your_app_secret"
ITEM_ID = "1234567890" # 商品ID示例
COUNTRY = "SG" # 新加坡站点
# 初始化API客户端
lazada_reviews = LazadaItemReviews(APP_KEY, APP_SECRET)
# 若为高级权限,设置更高的频率限制
# lazada_reviews.set_rate_limit(300)
# 1. 设置筛选条件
filters = {
# "rating": 5, # 只看5星评论
# "with_media": True, # 只看带图/视频评论
"sort": "newest" # 按最新排序
}
# 2. 批量获取评论数据
print("=== 获取商品评论数据 ===")
reviews, meta_info = lazada_reviews.batch_get_reviews(
item_id=ITEM_ID,
country=COUNTRY,
max_pages=3, # 获取前3页
page_size=20,
filters=filters
)
if reviews:
print(f"商品ID: {ITEM_ID}")
print(f"站点: {COUNTRY}")
print(f"获取评论数量: {len(reviews)}")
print(f"总评论数量: {meta_info['total_reviews']}")
print(f"平均评分: {meta_info['average_rating']}")
# 3. 分析评论数据
print("\n=== 评论数据分析 ===")
if reviews:
analysis = lazada_reviews.analyze_reviews(reviews)
print("评分分布:")
for rating, count in sorted(analysis["rating_analysis"]["distribution"].items(), key=lambda x: str(x)):
print(f" {rating}星: {count}条评论")
print(f"\n平均评分: {analysis['rating_analysis']['average']}")
print("\n情感分析:")
for sentiment, count in analysis["sentiment_analysis"]["distribution"].items():
print(f" {sentiment}: {count}条评论")
print(f"\n平均情感得分: {analysis['sentiment_analysis']['average_score']}")
print("\n评论类型分布:")
for media_type, count in analysis["media_analysis"].items():
print(f" {media_type}: {count}条评论")
# 4. 提取关键词
print("\n=== 评论关键词分析 ===")
top_keywords = lazada_reviews.extract_keywords(reviews, top_n=10)
print("评论中出现频率最高的10个关键词:")
for word, count in top_keywords:
print(f" {word}: {count}次")
# 提取负面评论中的关键词
negative_keywords = lazada_reviews.extract_keywords(reviews, rating_filter=1, top_n=5)
print("\n1星评论中出现频率最高的5个关键词:")
for word, count in negative_keywords:
print(f" {word}: {count}次")
# 5. 可视化分析结果
lazada_reviews.visualize_analysis(analysis, ITEM_ID, COUNTRY)
# 6. 导出数据到Excel
lazada_reviews.export_to_excel(reviews, analysis, meta_info, f"Lazada商品{ITEM_ID}_评论分析.xlsx")
else:
print("未获取到评论数据,无法进行分析")