阿里巴巴开放平台的 item_review 接口用于获取商品的用户评论数据,是分析商品口碑、用户反馈和市场需求的重要工具。通过该接口可以获取评论内容、评分、买家信息、追加评论等关键数据,为产品优化、市场策略制定提供数据支持。
一、item_review 接口核心特性分析
1. 接口功能与定位
2. 接口权限与调用限制
3. 核心参数解析
必选参数
可选参数
二、返回数据结构分析
三、Python 实现方案
import requests
import time
import hashlib
import json
import logging
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import re
from datetime import datetime, timedelta
from collections import defaultdict
from typing import Dict, List, Optional, Tuple
import jieba
from jieba.analyse import extract_tags
from snownlp import SnowNLP # 用于情感分析
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
# 配置中文显示
plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"]
plt.rcParams["axes.unicode_minus"] = False
class AlibabaItemReview:
"""阿里巴巴item_review接口封装类,用于获取和分析商品评论"""
def __init__(self, app_key: str, app_secret: str, access_token: str = ""):
"""
初始化客户端
:param app_key: 应用的app_key
:param app_secret: 应用的app_secret
:param access_token: 访问令牌
"""
self.app_key = app_key
self.app_secret = app_secret
self.access_token = access_token
self.gateway_url = "https://gw.open.1688.com/openapi/param2/2.0/"
self.session = requests.Session()
self.session.headers.update({
"Content-Type": "application/x-www-form-urlencoded;charset=utf-8",
"User-Agent": "AlibabaItemReview/1.0.0 (Python)"
})
# 频率控制
self.api_calls = 0
self.rate_limit = 60 # 每分钟最多调用次数
self.last_reset_time = time.time()
# 加载停用词
self.stopwords = self._load_stopwords()
def _load_stopwords(self) -> set:
"""加载中文停用词"""
try:
# 简单的停用词列表,实际应用中可替换为更完整的停用词表
stopwords = {
'的', '了', '在', '是', '我', '有', '和', '就', '不', '人', '都', '一',
'一个', '上', '也', '很', '到', '说', '要', '去', '你', '会', '着', '没有',
'看', '好', '自己', '这', '个', '也', '还', '可以', '买', '用', '后', '觉得'
}
return stopwords
except Exception as e:
logging.warning(f"加载停用词失败: {e},使用默认空集合")
return set()
def _generate_sign(self, params: Dict) -> str:
"""生成签名"""
# 排序参数
sorted_params = sorted(params.items(), key=lambda x: x[0])
# 拼接签名字符串
sign_str = self.app_secret
for k, v in sorted_params:
sign_str += f"{k}{v}"
sign_str += self.app_secret
# 计算MD5
return hashlib.md5(sign_str.encode('utf-8')).hexdigest().upper()
def _check_rate_limit(self) -> bool:
"""检查API调用频率限制"""
current_time = time.time()
# 每分钟重置计数
if current_time - self.last_reset_time > 60:
self.api_calls = 0
self.last_reset_time = current_time
# 检查是否超过限制
if self.api_calls >= self.rate_limit:
sleep_time = 60 - (current_time - self.last_reset_time)
logging.warning(f"API调用频率超限,等待 {sleep_time:.1f} 秒")
time.sleep(sleep_time + 1)
self.api_calls = 0
self.last_reset_time = time.time()
self.api_calls += 1
return True
def call_review_api(self, offer_id: str, page: int = 1, page_size: int = 40,
filters: Optional[Dict] = None) -> Optional[Dict]:
"""
调用item_review接口获取商品评论
:param offer_id: 商品ID
:param page: 页码
:param page_size: 每页数量
:param filters: 筛选条件
:return: 评论数据
"""
if not self.access_token:
logging.error("请先设置access_token")
return None
# 检查频率限制
if not self._check_rate_limit():
return None
# 构建基础参数
base_params = {
"app_key": self.app_key,
"access_token": self.access_token,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"format": "json",
"v": "2.0",
"sign_method": "md5",
"offerId": offer_id,
"page": page,
"pageSize": page_size
}
# 合并筛选参数
if filters:
base_params.update(filters)
# 生成签名
sign = self._generate_sign(base_params)
base_params["sign"] = sign
try:
# 构建完整URL
url = f"{self.gateway_url}alibaba.item.review.get"
response = self.session.post(url, data=base_params, timeout=15)
response.raise_for_status()
# 解析响应
result = response.json()
# 处理错误
if "error_response" in result:
error = result["error_response"]
logging.error(f"API调用错误: {error.get('msg')} (错误码: {error.get('code')})")
return None
# 返回结果
return result.get("alibaba_item_review_get_response", {})
except requests.exceptions.RequestException as e:
logging.error(f"API请求异常: {str(e)}")
return None
except json.JSONDecodeError:
logging.error(f"API响应解析失败: {response.text[:200]}...")
return None
def get_all_reviews(self, offer_id: str, max_pages: int = 10, page_size: int = 40,
filters: Optional[Dict] = None) -> Tuple[List[Dict], Dict]:
"""
获取多页评论数据
:param offer_id: 商品ID
:param max_pages: 最大页数
:param page_size: 每页数量
:param filters: 筛选条件
:return: 评论列表和统计信息
"""
all_reviews = []
stats = {}
page = 1
while page <= max_pages:
logging.info(f"获取第 {page} 页评论")
result = self.call_review_api(offer_id, page, page_size, filters)
if not result or "result" not in result:
break
review_result = result["result"]
# 保存统计信息(第一页获取一次即可)
if page == 1:
stats = {
"total_count": review_result.get("totalCount", 0),
"total_page": review_result.get("totalPage", 0),
"grade_distribution": review_result.get("gradeDistribution", {})
}
# 获取当前页评论
reviews = review_result.get("reviews", {}).get("review", [])
if not reviews:
break
# 格式化评论数据并添加到列表
formatted_reviews = [self._format_review(review) for review in reviews]
all_reviews.extend(formatted_reviews)
# 检查是否已达最后一页
if page >= review_result.get("totalPage", max_pages):
break
page += 1
time.sleep(1) # 避免请求过于频繁
logging.info(f"共获取 {len(all_reviews)} 条评论")
return all_reviews, stats
def _format_review(self, review: Dict) -> Dict:
"""格式化评论数据"""
return {
"review_id": review.get("reviewId"),
"buyer_id": review.get("buyerId"),
"buyer_nick": review.get("buyerNick"),
"grade": int(review.get("grade", 0)),
"content": review.get("content", ""),
"create_time": review.get("gmtCreate"),
"reply": review.get("replyContent", ""),
"reply_time": review.get("replyGmtCreate"),
"images": review.get("images", {}).get("image", []),
"attribute": review.get("attribute", ""),
"append_content": review.get("appendContent", ""),
"append_time": review.get("appendGmtCreate"),
"useful": int(review.get("useful", 0)),
"is_vip": review.get("isVip", False)
}
def analyze_reviews(self, reviews: List[Dict]) -> Dict:
"""分析评论数据"""
if not reviews:
return {"error": "没有评论数据可分析"}
# 1. 评分分析
grade_counts = defaultdict(int)
for review in reviews:
grade_counts[review["grade"]] += 1
# 2. 时间分布
time_distribution = defaultdict(int)
for review in reviews:
if review["create_time"]:
date_str = review["create_time"].split()[0]
time_distribution[date_str] += 1
# 3. 情感分析
sentiment_results = self._analyze_sentiment(reviews)
# 4. 关键词提取
keywords = self._extract_keywords(reviews)
# 5. 评论长度分析
content_lengths = [len(review["content"]) for review in reviews if review["content"]]
length_stats = {}
if content_lengths:
length_stats = {
"avg": round(sum(content_lengths) / len(content_lengths), 1),
"min": min(content_lengths),
"max": max(content_lengths),
"median": np.median(content_lengths)
}
# 6. 有图评论比例
image_count = sum(1 for review in reviews if review["images"])
image_ratio = image_count / len(reviews) if reviews else 0
# 7. 追加评论比例
append_count = sum(1 for review in reviews if review["append_content"])
append_ratio = append_count / len(reviews) if reviews else 0
return {
"total_reviews": len(reviews),
"grade_analysis": {
"distribution": dict(grade_counts),
"avg_grade": sum(k * v for k, v in grade_counts.items()) / len(reviews) if reviews else 0
},
"time_distribution": dict(time_distribution),
"sentiment_analysis": sentiment_results,
"keywords": keywords,
"content_length": length_stats,
"image_ratio": image_ratio,
"append_ratio": append_ratio
}
def _analyze_sentiment(self, reviews: List[Dict]) -> Dict:
"""对评论进行情感分析"""
sentiment_scores = []
positive_count = 0
negative_count = 0
neutral_count = 0
for review in reviews:
content = review["content"]
if not content:
continue
# 使用SnowNLP进行情感分析,返回值0-1,越接近1越积极
s = SnowNLP(content)
score = s.sentiments
sentiment_scores.append(score)
# 分类情感
if score > 0.6:
positive_count += 1
elif score < 0.4:
negative_count += 1
else:
neutral_count += 1
# 按评分分析情感
grade_sentiment = defaultdict(list)
for review in reviews:
content = review["content"]
if content:
s = SnowNLP(content)
grade_sentiment[review["grade"]].append(s.sentiments)
grade_sentiment_avg = {
grade: sum(scores)/len(scores) if scores else 0
for grade, scores in grade_sentiment.items()
}
return {
"total_analyzed": len(sentiment_scores),
"positive_ratio": positive_count / len(sentiment_scores) if sentiment_scores else 0,
"negative_ratio": negative_count / len(sentiment_scores) if sentiment_scores else 0,
"neutral_ratio": neutral_count / len(sentiment_scores) if sentiment_scores else 0,
"avg_score": sum(sentiment_scores)/len(sentiment_scores) if sentiment_scores else 0,
"grade_sentiment_avg": grade_sentiment_avg
}
def _extract_keywords(self, reviews: List[Dict], top_n: int = 20) -> Dict:
"""提取评论关键词"""
# 合并所有评论内容
all_content = " ".join([review["content"] for review in reviews if review["content"]])
# 清洗文本
all_content = re.sub(r"[^\w\s]", " ", all_content) # 去除特殊字符
all_content = re.sub(r"\s+", " ", all_content).strip() # 合并空格
# 提取关键词
keywords = extract_tags(
all_content,
topK=top_n,
withWeight=True,
allowPOS=('n', 'v', 'a') # 只提取名词、动词、形容词
)
# 按评分提取关键词
grade_keywords = {}
for grade in [1, 2, 3, 4, 5]:
grade_content = " ".join([
review["content"] for review in reviews
if review["content"] and review["grade"] == grade
])
if grade_content:
grade_content = re.sub(r"[^\w\s]", " ", grade_content)
grade_content = re.sub(r"\s+", " ", grade_content).strip()
grade_keywords[grade] = extract_tags(
grade_content,
topK=10,
withWeight=True,
allowPOS=('n', 'v', 'a')
)
return {
"overall": keywords,
"by_grade": grade_keywords
}
def get_negative_reasons(self, reviews: List[Dict], top_n: int = 10) -> List[Dict]:
"""获取负面评论的主要原因"""
# 筛选负面评论(评分<=3星)
negative_reviews = [
review for review in reviews
if review["grade"] <= 3 and review["content"]
]
if not negative_reviews:
return []
# 提取负面评论关键词
negative_content = " ".join([review["content"] for review in negative_reviews])
negative_content = re.sub(r"[^\w\s]", " ", negative_content)
negative_content = re.sub(r"\s+", " ", negative_content).strip()
keywords = extract_tags(
negative_content,
topK=top_n,
withWeight=True,
allowPOS=('n', 'v', 'a')
)
# 为每个关键词找到示例评论
result = []
for keyword, weight in keywords:
# 找到包含该关键词的评论示例
examples = []
for review in negative_reviews[:50]: # 只在前50条中查找
if keyword in review["content"] and len(examples) < 3:
examples.append({
"review_id": review["review_id"],
"content": review["content"],
"grade": review["grade"]
})
result.append({
"keyword": keyword,
"weight": weight,
"count": sum(1 for r in negative_reviews if keyword in r["content"]),
"examples": examples
})
return result
def visualize_analysis(self, analysis: Dict, output_dir: str = ".") -> None:
"""可视化分析结果"""
# 1. 评分分布饼图
if "grade_analysis" in analysis and analysis["grade_analysis"]["distribution"]:
grades = analysis["grade_analysis"]["distribution"]
labels = [f"{k}星" for k in grades.keys()]
sizes = list(grades.values())
plt.figure(figsize=(8, 6))
plt.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=90)
plt.title(f'评论评分分布 (平均评分: {analysis["grade_analysis"]["avg_grade"]:.1f})')
plt.axis('equal')
plt.tight_layout()
plt.savefig(f"{output_dir}/grade_distribution.png")
plt.close()
logging.info(f"评分分布图表已保存至 {output_dir}/grade_distribution.png")
# 2. 情感分析柱状图
if "sentiment_analysis" in analysis:
sentiment = analysis["sentiment_analysis"]
labels = ['正面', '负面', '中性']
sizes = [
sentiment["positive_ratio"],
sentiment["negative_ratio"],
sentiment["neutral_ratio"]
]
plt.figure(figsize=(8, 6))
plt.bar(labels, sizes, color=['green', 'red', 'gray'])
plt.title(f'评论情感分布 (平均情感值: {sentiment["avg_score"]:.2f})')
plt.ylim(0, 1)
plt.ylabel('比例')
for i, v in enumerate(sizes):
plt.text(i, v + 0.01, f'{v:.1%}', ha='center')
plt.tight_layout()
plt.savefig(f"{output_dir}/sentiment_distribution.png")
plt.close()
logging.info(f"情感分布图表已保存至 {output_dir}/sentiment_distribution.png")
# 3. 关键词云图(需要wordcloud库支持)
if "keywords" in analysis and analysis["keywords"]["overall"]:
try:
from wordcloud import WordCloud
# 准备关键词文本
keyword_text = " ".join([kw for kw, _ in analysis["keywords"]["overall"]])
# 生成词云
wc = WordCloud(
font_path="simhei.ttf", # 替换为你的中文字体路径
background_color="white",
width=800,
height=600
).generate(keyword_text)
plt.figure(figsize=(10, 7))
plt.imshow(wc, interpolation="bilinear")
plt.axis("off")
plt.title("评论关键词云图")
plt.tight_layout(pad=0)
plt.savefig(f"{output_dir}/keyword_cloud.png")
plt.close()
logging.info(f"关键词云图已保存至 {output_dir}/keyword_cloud.png")
except ImportError:
logging.warning("wordcloud库未安装,无法生成关键词云图")
except Exception as e:
logging.warning(f"生成关键词云图失败: {e}")
# 4. 时间趋势图
if "time_distribution" in analysis and analysis["time_distribution"]:
time_data = sorted(analysis["time_distribution"].items(), key=lambda x: x[0])
dates = [item[0] for item in time_data]
counts = [item[1] for item in time_data]
# 只显示最近30天的数据
if len(dates) > 30:
dates = dates[-30:]
counts = counts[-30:]
plt.figure(figsize=(12, 6))
plt.plot(dates, counts, marker='o', linestyle='-')
plt.title('评论数量时间趋势')
plt.xlabel('日期')
plt.ylabel('评论数量')
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig(f"{output_dir}/review_time_trend.png")
plt.close()
logging.info(f"时间趋势图表已保存至 {output_dir}/review_time_trend.png")
def export_to_excel(self, reviews: List[Dict], analysis: Dict, filename: str) -> None:
"""导出评论数据和分析结果到Excel"""
if not reviews and not analysis:
logging.warning("没有数据可导出")
return
try:
with pd.ExcelWriter(filename) as writer:
# 评论数据
if reviews:
df_reviews = pd.DataFrame(reviews)
df_reviews.to_excel(writer, sheet_name='评论数据', index=False)
# 分析结果
if analysis:
# 评分分析
if "grade_analysis" in analysis:
df_grade = pd.DataFrame([
{"评分": k, "数量": v, "比例": v/analysis["total_reviews"]}
for k, v in analysis["grade_analysis"]["distribution"].items()
])
df_grade.to_excel(writer, sheet_name='评分分析', index=False)
# 情感分析
if "sentiment_analysis" in analysis:
df_sentiment = pd.DataFrame([{
"类型": "正面",
"比例": analysis["sentiment_analysis"]["positive_ratio"]
}, {
"类型": "负面",
"比例": analysis["sentiment_analysis"]["negative_ratio"]
}, {
"类型": "中性",
"比例": analysis["sentiment_analysis"]["neutral_ratio"]
}])
df_sentiment.to_excel(writer, sheet_name='情感分析', index=False)
# 关键词
if "keywords" in analysis and analysis["keywords"]["overall"]:
df_keywords = pd.DataFrame([
{"关键词": kw, "权重": weight}
for kw, weight in analysis["keywords"]["overall"]
])
df_keywords.to_excel(writer, sheet_name='关键词分析', index=False)
logging.info(f"数据已导出至 {filename}")
except Exception as e:
logging.error(f"导出Excel失败: {e}")
# 示例调用
if __name__ == "__main__":
# 替换为实际的参数(从1688开放平台获取)
APP_KEY = "your_app_key"
APP_SECRET = "your_app_secret"
ACCESS_TOKEN = "your_access_token"
OFFER_ID = "61234567890" # 示例商品ID
# 初始化API客户端
review_analyzer = AlibabaItemReview(APP_KEY, APP_SECRET, ACCESS_TOKEN)
# 1. 获取评论数据
print("=== 获取评论数据 ===")
# 设置筛选条件:获取最近30天的评论
end_date = datetime.now()
start_date = end_date - timedelta(days=30)
filters = {
"startDate": start_date.strftime("%Y-%m-%d"),
"endDate": end_date.strftime("%Y-%m-%d"),
# "grade": 3, # 可以指定只获取特定评分的评论
# "hasImage": True, # 只获取有图评论
"sort": "newest" # 按最新排序
}
# 获取最多5页评论
reviews, stats = review_analyzer.get_all_reviews(
offer_id=OFFER_ID,
max_pages=5,
page_size=40,
filters=filters
)
print(f"商品ID: {OFFER_ID}")
print(f"获取评论总数: {len(reviews)}")
print(f"平台总评论数: {stats.get('total_count', 0)}")
print(f"评分分布: {stats.get('grade_distribution', {})}")
# 2. 分析评论数据
print("\n=== 评论数据分析 ===")
if reviews:
analysis = review_analyzer.analyze_reviews(reviews)
print(f"平均评分: {analysis['grade_analysis']['avg_grade']:.1f}")
print(f"情感分析: 正面{analysis['sentiment_analysis']['positive_ratio']:.1%}, "
f"负面{analysis['sentiment_analysis']['negative_ratio']:.1%}, "
f"中性{analysis['sentiment_analysis']['neutral_ratio']:.1%}")
print(f"有图评论比例: {analysis['image_ratio']:.1%}")
print(f"追加评论比例: {analysis['append_ratio']:.1%}")
print("\n热门关键词:")
for kw, weight in analysis["keywords"]["overall"][:10]:
print(f" {kw}: {weight:.4f}")
# 3. 获取负面评论原因
print("\n=== 负面评论主要原因 ===")
negative_reasons = review_analyzer.get_negative_reasons(reviews)
for i, reason in enumerate(negative_reasons[:5], 1):
print(f"{i}. 关键词: {reason['keyword']} (出现{reason['count']}次)")
if reason["examples"]:
print(f" 示例: {reason['examples'][0]['content'][:50]}...")
# 4. 可视化分析结果
review_analyzer.visualize_analysis(analysis)
# 5. 导出数据到Excel
review_analyzer.export_to_excel(reviews, analysis, "商品评论分析.xlsx")
else:
print("未获取到评论数据,无法进行分析")