Lazada 的item_search接口是按关键字搜索商品的核心工具,能够帮助开发者和商家快速获取平台上与特定关键词相关的商品信息。通过该接口,可实现类似 Lazada 网站搜索框的功能,获取符合搜索条件的商品列表,包含价格、销量、评分等关键数据,为市场调研、竞品分析和商品开发提供重要参考。
一、item_search 接口核心特性分析
1. 接口定位与核心价值
2. 接口权限与调用限制
3. 核心参数解析
必选参数
可选参数
二、签名生成与返回数据结构
1. 签名生成逻辑
2. 返回数据结构解析
三、Python 实现方案
import requests
import time
import hmac
import hashlib
import base64
import json
import logging
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import re
from datetime import datetime
from collections import defaultdict
from typing import Dict, Optional, List, Tuple
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
# 配置中文显示
plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"]
plt.rcParams["axes.unicode_minus"] = False
class LazadaItemSearch:
"""Lazada item_search接口封装类,用于按关键字搜索商品并分析结果"""
def __init__(self, app_key: str, app_secret: str):
"""
初始化Lazada API客户端
:param app_key: 应用的app_key
:param app_secret: 应用的app_secret
"""
self.app_key = app_key
self.app_secret = app_secret
self.api_url = "https://api.lazada.com/rest"
# 频率控制
self.rate_limit = 60 # 默认基础权限,高级权限可修改为300
self.call_timestamps = [] # 存储调用时间戳(秒级)
# 支持的站点代码
self.supported_countries = ["SG", "MY", "TH", "ID", "PH", "VN"]
# 货币代码映射
self.currency_codes = {
"SG": "SGD",
"MY": "MYR",
"TH": "THB",
"ID": "IDR",
"PH": "PHP",
"VN": "VND"
}
# 支持的排序方式
self.supported_sorts = [
"popularity", "price_asc", "price_desc",
"rating", "newest", "sales"
]
def set_rate_limit(self, limit: int) -> None:
"""设置调用频率限制(次/小时)"""
if 60 <= limit <= 300:
self.rate_limit = limit
logging.info(f"已设置调用频率限制为 {limit} 次/小时")
else:
logging.warning("频率限制必须在60-300之间,未修改")
def _generate_sign(self, params: Dict) -> str:
"""生成签名(HMAC-SHA256算法)"""
# 1. 按参数名ASCII升序排序
sorted_params = sorted(params.items(), key=lambda x: x[0])
# 2. 拼接为"key=value&key=value"格式
param_str = "&".join([f"{k}={v}" for k, v in sorted_params])
# 3. HMAC-SHA256加密
hmac_code = hmac.new(
self.app_secret.encode('utf-8'),
param_str.encode('utf-8'),
hashlib.sha256
).digest()
# 4. Base64编码
return base64.b64encode(hmac_code).decode('utf-8')
def _check_rate_limit(self) -> None:
"""检查并控制调用频率"""
current_time = time.time() # 秒级
# 保留1小时内的调用记录
self.call_timestamps = [t for t in self.call_timestamps if current_time - t < 3600]
# 若超过限制,计算需要等待的时间
if len(self.call_timestamps) >= self.rate_limit:
oldest_time = self.call_timestamps[0]
sleep_time = (3600 - (current_time - oldest_time)) + 1 # 额外加1秒保险
logging.warning(f"调用频率超限,等待 {sleep_time:.1f} 秒")
time.sleep(sleep_time)
# 再次清理过期记录
self.call_timestamps = [t for t in self.call_timestamps if time.time() - t < 3600]
# 记录本次调用时间
self.call_timestamps.append(current_time)
def search_items(self, keyword: str, country: str,
page: int = 1, page_size: int = 40,
filters: Optional[Dict] = None) -> Optional[Dict]:
"""
按关键字搜索商品
:param keyword: 搜索关键词
:param country: 站点代码
:param page: 页码
:param page_size: 每页数量
:param filters: 筛选参数
:return: 搜索结果数据
"""
# 验证站点代码
if country not in self.supported_countries:
logging.error(f"不支持的站点代码: {country},支持的站点: {', '.join(self.supported_countries)}")
return None
# 验证排序方式
if filters and "sort" in filters and filters["sort"] not in self.supported_sorts:
logging.error(f"不支持的排序方式: {filters['sort']},支持的方式: {', '.join(self.supported_sorts)}")
return None
# 验证分页大小
if not (10 <= page_size <= 40):
logging.error(f"page_size必须在10-40之间,当前值: {page_size}")
return None
# 构建基础参数
base_params = {
"app_key": self.app_key,
"method": "item.search",
"timestamp": str(int(time.time())), # 秒级时间戳
"format": "json",
"v": "2.0",
"q": keyword,
"country": country,
"page": str(page),
"page_size": str(page_size)
}
# 合并筛选参数
if filters and isinstance(filters, Dict):
# 过滤空值参数
valid_filters = {k: v for k, v in filters.items() if v is not None}
# 转换为字符串类型
for k, v in valid_filters.items():
base_params[k] = str(v).lower() if isinstance(v, bool) else str(v)
# 生成签名
sign = self._generate_sign(base_params)
base_params["sign"] = sign
# 检查频率限制
self._check_rate_limit()
try:
# 发送请求
response = requests.get(self.api_url, params=base_params, timeout=15)
response.raise_for_status()
# 解析响应
result = response.json()
# 处理错误
if "code" in result and result["code"] != 0:
logging.error(f"API调用错误: {result.get('message')} (错误码: {result.get('code')})")
return None
# 提取结果
search_data = result.get("data", {})
if not search_data.get("items"):
logging.warning("未获取到搜索结果")
return None
logging.info(f"成功在 {country} 站点搜索关键词 '{keyword}',第 {page} 页,找到 {search_data.get('total_items', 0)} 个商品")
return search_data
except requests.exceptions.RequestException as e:
logging.error(f"请求异常: {str(e)}")
return None
except json.JSONDecodeError:
logging.error(f"响应解析失败: {response.text[:200]}...")
return None
def batch_search_items(self, keyword: str, country: str,
max_pages: int = 5, page_size: int = 40,
filters: Optional[Dict] = None) -> Tuple[List[Dict], Dict]:
"""
批量获取多页搜索结果
:param keyword: 搜索关键词
:param country: 站点代码
:param max_pages: 最大页数
:param page_size: 每页数量
:param filters: 筛选参数
:return: 商品列表和元信息
"""
all_items = []
meta_info = {
"keyword": keyword,
"country": country,
"total_items": 0,
"total_pages": 0,
"filters": filters,
"fetch_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"currency": self.currency_codes.get(country, "Unknown")
}
page = 1
while page <= max_pages:
logging.info(f"正在获取关键词 '{keyword}' 第 {page}/{max_pages} 页搜索结果...")
result = self.search_items(keyword, country, page, page_size, filters)
if not result:
break
# 提取商品数据
items = result.get("items", [])
if not items:
logging.info("当前页无商品数据,停止获取")
break
all_items.extend(items)
# 保存元信息(第一页)
if page == 1:
meta_info["total_items"] = result.get("total_items", 0)
# 计算总页数,不超过max_pages
meta_info["total_pages"] = min(max_pages,
(meta_info["total_items"] + page_size - 1) // page_size)
meta_info["filters_applied"] = result.get("filters_applied", {})
page += 1
logging.info(f"批量搜索完成,共获取 {len(all_items)} 个商品数据")
return all_items, meta_info
def analyze_search_results(self, items: List[Dict], meta_info: Dict) -> Dict:
"""分析搜索结果数据"""
if not items:
return {"error": "没有商品数据可分析"}
country = meta_info.get("country", "")
currency = self.currency_codes.get(country, "Unknown")
# 1. 价格分析
price_info = {
"min_price": None,
"max_price": None,
"avg_price": None,
"median_price": None,
"currency": currency,
"price_distribution": defaultdict(int)
}
prices = []
for item in items:
try:
price = float(item.get("price", 0))
prices.append(price)
# 价格区间分布
if price < 50:
price_info["price_distribution"]["<50"] += 1
elif price < 100:
price_info["price_distribution"]["50-100"] += 1
elif price < 200:
price_info["price_distribution"]["100-200"] += 1
elif price < 500:
price_info["price_distribution"]["200-500"] += 1
else:
price_info["price_distribution"][">500"] += 1
except (ValueError, TypeError):
continue
if prices:
price_info["min_price"] = round(min(prices), 2)
price_info["max_price"] = round(max(prices), 2)
price_info["avg_price"] = round(sum(prices)/len(prices), 2)
price_info["median_price"] = round(np.median(prices), 2)
price_info["price_distribution"] = dict(price_info["price_distribution"])
# 2. 品牌分析
brand_info = {
"total_brands": 0,
"brand_distribution": defaultdict(int),
"top_brands": []
}
for item in items:
brand = item.get("brand_name", "未知品牌")
brand_info["brand_distribution"][brand] += 1
brand_info["total_brands"] = len(brand_info["brand_distribution"])
# 获取前10个品牌
brand_info["top_brands"] = sorted(
brand_info["brand_distribution"].items(),
key=lambda x: x[1],
reverse=True
)[:10]
brand_info["brand_distribution"] = dict(brand_info["brand_distribution"])
# 3. 销量分析
sales_info = {
"total_sales": 0,
"avg_sales": 0,
"sales_distribution": defaultdict(int)
}
sales_counts = []
for item in items:
try:
sales = int(item.get("sales_count", 0))
sales_counts.append(sales)
sales_info["total_sales"] += sales
# 销量区间分布
if sales == 0:
sales_info["sales_distribution"]["0"] += 1
elif sales < 10:
sales_info["sales_distribution"]["1-9"] += 1
elif sales < 100:
sales_info["sales_distribution"]["10-99"] += 1
elif sales < 500:
sales_info["sales_distribution"]["100-499"] += 1
else:
sales_info["sales_distribution"][">=500"] += 1
except (ValueError, TypeError):
continue
if sales_counts:
sales_info["avg_sales"] = round(sum(sales_counts)/len(sales_counts), 1)
sales_info["sales_distribution"] = dict(sales_info["sales_distribution"])
# 4. 评分分析
rating_info = {
"avg_rating": 0,
"rating_distribution": defaultdict(int),
"no_rating_count": 0
}
ratings = []
for item in items:
try:
rating = float(item.get("rating_score", 0))
if rating > 0:
ratings.append(rating)
# 评分分布(0.5星为间隔)
rating_bin = f"{round(rating * 2) / 2:.1f}"
rating_info["rating_distribution"][rating_bin] += 1
else:
rating_info["no_rating_count"] += 1
except (ValueError, TypeError):
rating_info["no_rating_count"] += 1
if ratings:
rating_info["avg_rating"] = round(sum(ratings)/len(ratings), 1)
rating_info["rating_distribution"] = dict(rating_info["rating_distribution"])
# 5. 促销分析
promotion_info = {
"has_promotion_count": 0,
"promotion_ratio": 0,
"avg_discount": 0
}
discounts = []
for item in items:
if item.get("has_promotion", False):
promotion_info["has_promotion_count"] += 1
try:
price = float(item.get("price", 0))
original_price = float(item.get("original_price", 0))
if original_price > 0 and price < original_price:
discount = round((1 - price/original_price) * 100, 1)
discounts.append(discount)
except (ValueError, TypeError):
continue
if items:
promotion_info["promotion_ratio"] = round(
promotion_info["has_promotion_count"] / len(items) * 100, 1
)
if discounts:
promotion_info["avg_discount"] = round(sum(discounts)/len(discounts), 1)
# 6. 卖家类型分析
seller_info = {
"official_count": 0,
"official_ratio": 0,
"top_sellers": defaultdict(int)
}
for item in items:
if item.get("is_official", False):
seller_info["official_count"] += 1
seller_id = item.get("seller_id", "未知卖家")
seller_info["top_sellers"][seller_id] += 1
if items:
seller_info["official_ratio"] = round(
seller_info["official_count"] / len(items) * 100, 1
)
# 获取前5名卖家
seller_info["top_sellers"] = sorted(
seller_info["top_sellers"].items(),
key=lambda x: x[1],
reverse=True
)[:5]
seller_info["top_sellers"] = [(s if s != "未知卖家" else s, c) for s, c in seller_info["top_sellers"]]
return {
"search_keyword": meta_info.get("keyword"),
"country": country,
"total_items": len(items),
"price_analysis": price_info,
"brand_analysis": brand_info,
"sales_analysis": sales_info,
"rating_analysis": rating_info,
"promotion_analysis": promotion_info,
"seller_analysis": seller_info,
"analysis_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
def get_top_items(self, items: List[Dict], by: str = "sales", top_n: int = 10) -> List[Dict]:
"""
获取特定维度排名靠前的商品
:param items: 商品列表
:param by: 排序维度:sales(销量), price(价格), rating(评分)
:param top_n: 取前N名
:return: 排序后的商品列表
"""
if not items:
return []
valid_dimensions = ["sales", "price", "rating"]
if by not in valid_dimensions:
logging.warning(f"无效的排序维度: {by},使用默认值 'sales'")
by = "sales"
# 准备排序数据
ranked_items = []
for item in items:
try:
if by == "sales":
value = int(item.get("sales_count", 0))
# 销量排序是降序
sort_value = -value
elif by == "price":
# 价格排序是升序
value = float(item.get("price", 0))
sort_value = value
elif by == "rating":
# 评分排序是降序
value = float(item.get("rating_score", 0))
sort_value = -value
ranked_items.append((item, sort_value, value))
except (ValueError, TypeError):
continue
# 排序并返回前N名
ranked_items.sort(key=lambda x: x[1])
return [{"item": p[0], "value": p[2]} for p in ranked_items[:top_n]]
def visualize_analysis(self, analysis: Dict, output_dir: str = ".") -> None:
"""可视化分析结果"""
if "error" in analysis:
logging.warning(analysis["error"])
return
keyword = analysis.get("search_keyword", "未知关键词")
country = analysis.get("country", "未知站点")
# 1. 价格分布直方图
if "price_analysis" in analysis and analysis["price_analysis"]["price_distribution"]:
plt.figure(figsize=(10, 6))
price_bins = sorted(analysis["price_analysis"]["price_distribution"].keys(),
key=lambda x: float(re.findall(r'\d+', x)[0]) if re.findall(r'\d+', x) else 0)
counts = [analysis["price_analysis"]["price_distribution"][b] for b in price_bins]
plt.bar(price_bins, counts, color='lightblue')
plt.title(f'关键词 "{keyword}" ({country}) 价格分布')
plt.xlabel(f'价格区间 ({analysis["price_analysis"]["currency"]})')
plt.ylabel('商品数量')
for i, v in enumerate(counts):
plt.text(i, v + 0.5, str(v), ha='center')
plt.tight_layout()
plt.savefig(f"{output_dir}/price_distribution.png")
plt.close()
logging.info(f"价格分布图表已保存至 {output_dir}/price_distribution.png")
# 2. 品牌分布饼图(前5名+其他)
if "brand_analysis" in analysis and analysis["brand_analysis"]["brand_distribution"]:
plt.figure(figsize=(10, 8))
brand_data = sorted(analysis["brand_analysis"]["brand_distribution"].items(),
key=lambda x: x[1], reverse=True)
if len(brand_data) > 5:
top5 = brand_data[:5]
others = sum(count for _, count in brand_data[5:])
top5.append(("其他品牌", others))
brand_data = top5
labels, sizes = zip(*brand_data)
plt.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=90)
plt.title(f'关键词 "{keyword}" ({country}) 品牌分布')
plt.axis('equal')
plt.tight_layout()
plt.savefig(f"{output_dir}/brand_distribution.png")
plt.close()
logging.info(f"品牌分布图表已保存至 {output_dir}/brand_distribution.png")
# 3. 销量分布条形图
if "sales_analysis" in analysis and analysis["sales_analysis"]["sales_distribution"]:
plt.figure(figsize=(10, 6))
sales_bins = sorted(analysis["sales_analysis"]["sales_distribution"].keys(),
key=lambda x: int(re.findall(r'\d+', x)[0]) if re.findall(r'\d+', x) else -1)
counts = [analysis["sales_analysis"]["sales_distribution"][b] for b in sales_bins]
plt.bar(sales_bins, counts, color='orange')
plt.title(f'关键词 "{keyword}" ({country}) 销量分布')
plt.xlabel('销量区间')
plt.ylabel('商品数量')
for i, v in enumerate(counts):
plt.text(i, v + 0.5, str(v), ha='center')
plt.tight_layout()
plt.savefig(f"{output_dir}/sales_distribution.png")
plt.close()
logging.info(f"销量分布图表已保存至 {output_dir}/sales_distribution.png")
# 4. 评分分布条形图
if "rating_analysis" in analysis and analysis["rating_analysis"]["rating_distribution"]:
plt.figure(figsize=(10, 6))
rating_bins = sorted(analysis["rating_analysis"]["rating_distribution"].keys(),
key=lambda x: float(x))
counts = [analysis["rating_analysis"]["rating_distribution"][b] for b in rating_bins]
# 添加无评分数据
if analysis["rating_analysis"]["no_rating_count"] > 0:
rating_bins.append("无评分")
counts.append(analysis["rating_analysis"]["no_rating_count"])
plt.bar(rating_bins, counts, color='lightgreen')
plt.title(f'关键词 "{keyword}" ({country}) 评分分布')
plt.xlabel('评分 (星)')
plt.ylabel('商品数量')
plt.axhline(y=sum(counts)/len(counts), color='r', linestyle='--',
label=f'平均分布: {sum(counts)/len(counts):.1f}')
for i, v in enumerate(counts):
plt.text(i, v + 0.5, str(v), ha='center')
plt.legend()
plt.tight_layout()
plt.savefig(f"{output_dir}/rating_distribution.png")
plt.close()
logging.info(f"评分分布图表已保存至 {output_dir}/rating_distribution.png")
# 5. 促销情况饼图
if "promotion_analysis" in analysis:
plt.figure(figsize=(8, 8))
labels = ['有促销', '无促销']
sizes = [
analysis["promotion_analysis"]["has_promotion_count"],
analysis["total_items"] - analysis["promotion_analysis"]["has_promotion_count"]
]
plt.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=90)
plt.title(f'关键词 "{keyword}" ({country}) 促销情况')
plt.axis('equal')
plt.tight_layout()
plt.savefig(f"{output_dir}/promotion_distribution.png")
plt.close()
logging.info(f"促销情况图表已保存至 {output_dir}/promotion_distribution.png")
def export_to_excel(self, items: List[Dict], analysis: Dict, meta_info: Dict,
filename: str) -> None:
"""导出搜索结果和分析到Excel"""
if not items and not analysis:
logging.warning("没有数据可导出")
return
try:
with pd pd.ExcelWriter(filename) as writer:
# 搜索信息
pd.DataFrame([meta_info]).to_excel(writer, sheet_name='搜索信息', index=False)
# 商品列表
if items:
item_list = []
for item in items:
item_data = {
"商品ID": item.get("item_id"),
"标题": item.get("title"),
"品牌": item.get("brand_name"),
"价格": item.get("price"),
"原价": item.get("original_price"),
"折扣(%)": round((1 - float(item.get("price", 0))/float(item.get("original_price", 1))) * 100, 1)
if float(item.get("original_price", 0)) > 0 else 0,
"销量": item.get("sales_count"),
"评分": item.get("rating_score"),
"评论数": item.get("review_count"),
"卖家ID": item.get("seller_id"),
"卖家名称": item.get("seller_name"),
"是否官方店铺": "是" if item.get("is_official", False) else "否",
"是否有促销": "是" if item.get("has_promotion", False) else "否"
}
item_list.append(item_data)
df_items = pd.DataFrame(item_list)
df_items.to_excel(writer, sheet_name='商品列表', index=False)
# 分析结果
if analysis and "error" not in analysis:
# 价格分析
df_price = pd.DataFrame([analysis["price_analysis"]])
df_price.to_excel(writer, sheet_name='价格分析', index=False)
# 品牌分析
df_brand = pd.DataFrame(list(analysis["brand_analysis"]["brand_distribution"].items()),
columns=['品牌', '数量'])
df_brand.to_excel(writer, sheet_name='品牌分析', index=False)
# 销量分析
df_sales = pd.DataFrame(list(analysis["sales_analysis"]["sales_distribution"].items()),
columns=['销量区间', '数量'])
df_sales.to_excel(writer, sheet_name='销量分析', index=False)
logging.info(f"数据已导出至 {filename}")
except Exception as e:
logging.error(f"导出Excel失败: {e}")
# 示例调用
if __name__ == "__main__":
# 替换为实际的参数(从Lazada开放平台获取)
APP_KEY = "your_app_key"
APP_SECRET = "your_app_secret"
KEYWORD = "wireless earbuds" # 搜索关键词
COUNTRY = "SG" # 新加坡站点
# 初始化API客户端
lazada_search = LazadaItemSearch(APP_KEY, APP_SECRET)
# 若为高级权限,设置更高的频率限制
# lazada_search.set_rate_limit(300)
# 1. 设置搜索筛选条件
filters = {
"sort": "sales", # 按销量排序
"price_from": 50, # 最低价格
"price_to": 300, # 最高价格
# "brand": "Apple,Samsung", # 品牌筛选
"has_promotion": False # 不限制促销
}
# 2. 批量获取搜索结果
print("=== 搜索商品 ===")
items, meta_info = lazada_search.batch_search_items(
keyword=KEYWORD,
country=COUNTRY,
max_pages=3, # 获取前3页
page_size=40,
filters=filters
)
if items:
print(f"搜索关键词: {KEYWORD}")
print(f"站点: {COUNTRY}")
print(f"获取商品数量: {len(items)}")
print(f"总商品数量: {meta_info['total_items']}")
print(f"货币单位: {meta_info['currency']}")
# 3. 分析搜索结果
print("\n=== 搜索结果分析 ===")
if items:
analysis = lazada_search.analyze_search_results(items, meta_info)
print("价格分析:")
print(f" 价格范围: {analysis['price_analysis']['min_price']} - {analysis['price_analysis']['max_price']} {analysis['price_analysis']['currency']}")
print(f" 平均价格: {analysis['price_analysis']['avg_price']} {analysis['price_analysis']['currency']}")
print("\n品牌分析:")
print(f" 品牌总数: {analysis['brand_analysis']['total_brands']}")
print(" 主要品牌:")
for brand, count in analysis["brand_analysis"]["top_brands"][:5]:
print(f" {brand}: {count}个商品")
print("\n销售分析:")
print(f" 总销量: {analysis['sales_analysis']['total_sales']}")
print(f" 平均销量: {analysis['sales_analysis']['avg_sales']}")
print("\n评分分析:")
print(f" 平均评分: {analysis['rating_analysis']['avg_rating']}")
print(f" 无评分商品: {analysis['rating_analysis']['no_rating_count']}")
print("\n促销分析:")
print(f" 有促销商品比例: {analysis['promotion_analysis']['promotion_ratio']}%")
print(f" 平均折扣: {analysis['promotion_analysis']['avg_discount']}%")
# 4. 获取各维度TOP商品
print("\n=== 各维度TOP3商品 ===")
top_sales = lazada_search.get_top_items(items, by="sales", top_n=3)
print("销量TOP3:")
for i, item_info in enumerate(top_sales, 1):
item = item_info["item"]
print(f"{i}. {item['title'][:30]}... 销量: {item_info['value']}")
top_rating = lazada_search.get_top_items(items, by="rating", top_n=3)
print("\n评分TOP3:")
for i, item_info in enumerate(top_rating, 1):
item = item_info["item"]
print(f"{i}. {item['title'][:30]}... 评分: {item_info['value']}")
top_cheap = lazada_search.get_top_items(items, by="price", top_n=3)
print("\n价格最低TOP3:")
for i, item_info in enumerate(top_cheap, 1):
item = item_info["item"]
print(f"{i}. {item['title'][:30]}... 价格: {item_info['value']} {meta_info['currency']}")
# 5. 可视化分析结果
lazada_search.visualize_analysis(analysis)
# 6. 导出数据到Excel
lazada_search.export_to_excel(items, analysis, meta_info, f"Lazada搜索_{KEYWORD.replace(' ', '_')}_分析.xlsx")
else:
print("未获取到搜索结果,无法进行分析")