×

item_search - Lazada 按关键字搜索商品接口深度分析及 Python 实现

万邦科技Lex 万邦科技Lex 发表于2025-09-22 14:53:34 浏览227 评论0

抢沙发发表评论

            注册账号免费测试lazadaAPI数据接口

Lazada 的item_search接口是按关键字搜索商品的核心工具,能够帮助开发者和商家快速获取平台上与特定关键词相关的商品信息。通过该接口,可实现类似 Lazada 网站搜索框的功能,获取符合搜索条件的商品列表,包含价格、销量、评分等关键数据,为市场调研、竞品分析和商品开发提供重要参考。

一、item_search 接口核心特性分析

1. 接口定位与核心价值

item_search接口专注于基于关键词的商品搜索功能,其核心价值体现在:
  • 支持多语言关键词搜索,适配东南亚多语言市场

  • 提供丰富的筛选条件,精准定位目标商品

  • 支持多种排序方式,满足不同分析需求

  • 可批量获取商品列表,高效进行市场分析

  • 覆盖东南亚六国站点,支持跨境业务分析

2. 接口权限与调用限制

使用 Lazada 搜索接口需遵守平台规范:
限制类型具体规则说明
权限要求需注册 Lazada 开放平台开发者账号,完成应用认证个人开发者可获取基础搜索结果,企业开发者有更高权限
调用频率基础权限:60 次 / 小时;高级权限:300 次 / 小时按 app_key 和 IP 双重限制
数据返回单次最多返回 40 条商品,最多支持 50 页数据搜索结果有 15 分钟缓存
字段限制基础字段(ID、标题、价格等)免费;高级字段(详细描述、卖家信息)需申请部分敏感字段受访问限制
站点限制需指定站点代码,不同站点搜索结果独立同一关键词在不同站点结果可能差异较大

3. 核心参数解析

必选参数

参数名类型说明示例
app_keyString应用唯一标识"lazada_appkey_123"
signString签名,按平台算法生成见下文签名逻辑
timestampLong时间戳(秒级)1689000000
qString搜索关键词"wireless earbuds"
countryString站点代码"SG""MY""TH""ID""PH""VN"

可选参数

参数名类型说明示例
pageInteger页码,默认 11
page_sizeInteger每页条数,10-4040
sortString排序方式"popularity"( popularity)、"price_asc"(价格升序)、"price_desc"(价格降序)、"rating"(评分)
price_fromFloat最低价格50
price_toFloat最高价格200
brandString品牌筛选"Apple""Samsung"
seller_idString卖家 ID 筛选"123456"
category_idString类目 ID 筛选"1234"
has_promotionBoolean是否只看有促销的商品true
is_officialBoolean是否只看官方店铺商品true

二、签名生成与返回数据结构

1. 签名生成逻辑

Lazada item_search接口采用与其他接口一致的 HMAC-SHA256 签名算法,步骤如下:
  1. 收集所有请求参数(不包含sign),按参数名 ASCII 码升序排序

  2. 拼接为key=value&key=value格式字符串(如"app_key=test&q=earbuds"

  3. 使用app_secret作为密钥,对拼接字符串进行 HMAC-SHA256 加密

  4. 对加密结果进行 Base64 编码,得到最终签名值

示例:参数:app_key=test&country=SG&q=earbuds×tamp=1689000000排序后字符串:app_key=test&country=SG&q=earbuds×tamp=1689000000假设app_secret=secret123,HMAC-SHA256 加密后 Base64 编码得到签名。

2. 返回数据结构解析

接口返回 JSON 格式数据,核心结构包括:
  1. 响应状态信息:请求状态码、消息、请求 ID

  2. 搜索结果统计:总商品数、总页数、当前页

  3. 商品列表:符合搜索条件的商品集合

  4. 筛选条件:可用的筛选选项(价格区间、品牌等)

商品列表中的关键数据字段:
  • item_id:商品唯一标识

  • title:商品标题

  • image:商品主图 URL

  • price:当前售价

  • original_price:原价

  • discount:折扣比例

  • sales_count:销售数量

  • rating_score:商品评分

  • review_count:评论数量

  • brand_name:品牌名称

  • category_id:类目 ID

  • seller_id:卖家 ID

  • seller_name:卖家名称

  • is_official:是否官方店铺

  • has_promotion:是否有促销活动

三、Python 实现方案

以下是 Lazada item_search接口的完整 Python 实现,包含接口调用、数据处理、分析及可视化功能:
import requests
import time
import hmac
import hashlib
import base64
import json
import logging
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import re
from datetime import datetime
from collections import defaultdict
from typing import Dict, Optional, List, Tuple

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)

# 配置中文显示
plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"]
plt.rcParams["axes.unicode_minus"] = False

class LazadaItemSearch:
    """Lazada item_search接口封装类,用于按关键字搜索商品并分析结果"""
    
    def __init__(self, app_key: str, app_secret: str):
        """
        初始化Lazada API客户端
        :param app_key: 应用的app_key
        :param app_secret: 应用的app_secret
        """
        self.app_key = app_key
        self.app_secret = app_secret
        self.api_url = "https://api.lazada.com/rest"
        
        # 频率控制
        self.rate_limit = 60  # 默认基础权限,高级权限可修改为300
        self.call_timestamps = []  # 存储调用时间戳(秒级)
        
        # 支持的站点代码
        self.supported_countries = ["SG", "MY", "TH", "ID", "PH", "VN"]
        # 货币代码映射
        self.currency_codes = {
            "SG": "SGD",
            "MY": "MYR",
            "TH": "THB",
            "ID": "IDR",
            "PH": "PHP",
            "VN": "VND"
        }
        
        # 支持的排序方式
        self.supported_sorts = [
            "popularity", "price_asc", "price_desc", 
            "rating", "newest", "sales"
        ]
    
    def set_rate_limit(self, limit: int) -> None:
        """设置调用频率限制(次/小时)"""
        if 60 <= limit <= 300:
            self.rate_limit = limit
            logging.info(f"已设置调用频率限制为 {limit} 次/小时")
        else:
            logging.warning("频率限制必须在60-300之间,未修改")
    
    def _generate_sign(self, params: Dict) -> str:
        """生成签名(HMAC-SHA256算法)"""
        # 1. 按参数名ASCII升序排序
        sorted_params = sorted(params.items(), key=lambda x: x[0])
        
        # 2. 拼接为"key=value&key=value"格式
        param_str = "&".join([f"{k}={v}" for k, v in sorted_params])
        
        # 3. HMAC-SHA256加密
        hmac_code = hmac.new(
            self.app_secret.encode('utf-8'),
            param_str.encode('utf-8'),
            hashlib.sha256
        ).digest()
        
        # 4. Base64编码
        return base64.b64encode(hmac_code).decode('utf-8')
    
    def _check_rate_limit(self) -> None:
        """检查并控制调用频率"""
        current_time = time.time()  # 秒级
        # 保留1小时内的调用记录
        self.call_timestamps = [t for t in self.call_timestamps if current_time - t < 3600]
        
        # 若超过限制,计算需要等待的时间
        if len(self.call_timestamps) >= self.rate_limit:
            oldest_time = self.call_timestamps[0]
            sleep_time = (3600 - (current_time - oldest_time)) + 1  # 额外加1秒保险
            logging.warning(f"调用频率超限,等待 {sleep_time:.1f} 秒")
            time.sleep(sleep_time)
            # 再次清理过期记录
            self.call_timestamps = [t for t in self.call_timestamps if time.time() - t < 3600]
        
        # 记录本次调用时间
        self.call_timestamps.append(current_time)
    
    def search_items(self, keyword: str, country: str, 
                    page: int = 1, page_size: int = 40, 
                    filters: Optional[Dict] = None) -> Optional[Dict]:
        """
        按关键字搜索商品
        :param keyword: 搜索关键词
        :param country: 站点代码
        :param page: 页码
        :param page_size: 每页数量
        :param filters: 筛选参数
        :return: 搜索结果数据
        """
        # 验证站点代码
        if country not in self.supported_countries:
            logging.error(f"不支持的站点代码: {country},支持的站点: {', '.join(self.supported_countries)}")
            return None
        
        # 验证排序方式
        if filters and "sort" in filters and filters["sort"] not in self.supported_sorts:
            logging.error(f"不支持的排序方式: {filters['sort']},支持的方式: {', '.join(self.supported_sorts)}")
            return None
        
        # 验证分页大小
        if not (10 <= page_size <= 40):
            logging.error(f"page_size必须在10-40之间,当前值: {page_size}")
            return None
        
        # 构建基础参数
        base_params = {
            "app_key": self.app_key,
            "method": "item.search",
            "timestamp": str(int(time.time())),  # 秒级时间戳
            "format": "json",
            "v": "2.0",
            "q": keyword,
            "country": country,
            "page": str(page),
            "page_size": str(page_size)
        }
        
        # 合并筛选参数
        if filters and isinstance(filters, Dict):
            # 过滤空值参数
            valid_filters = {k: v for k, v in filters.items() if v is not None}
            # 转换为字符串类型
            for k, v in valid_filters.items():
                base_params[k] = str(v).lower() if isinstance(v, bool) else str(v)
        
        # 生成签名
        sign = self._generate_sign(base_params)
        base_params["sign"] = sign
        
        # 检查频率限制
        self._check_rate_limit()
        
        try:
            # 发送请求
            response = requests.get(self.api_url, params=base_params, timeout=15)
            response.raise_for_status()
            
            # 解析响应
            result = response.json()
            
            # 处理错误
            if "code" in result and result["code"] != 0:
                logging.error(f"API调用错误: {result.get('message')} (错误码: {result.get('code')})")
                return None
                
            # 提取结果
            search_data = result.get("data", {})
            if not search_data.get("items"):
                logging.warning("未获取到搜索结果")
                return None
                
            logging.info(f"成功在 {country} 站点搜索关键词 '{keyword}',第 {page} 页,找到 {search_data.get('total_items', 0)} 个商品")
            return search_data
            
        except requests.exceptions.RequestException as e:
            logging.error(f"请求异常: {str(e)}")
            return None
        except json.JSONDecodeError:
            logging.error(f"响应解析失败: {response.text[:200]}...")
            return None
    
    def batch_search_items(self, keyword: str, country: str, 
                         max_pages: int = 5, page_size: int = 40, 
                         filters: Optional[Dict] = None) -> Tuple[List[Dict], Dict]:
        """
        批量获取多页搜索结果
        :param keyword: 搜索关键词
        :param country: 站点代码
        :param max_pages: 最大页数
        :param page_size: 每页数量
        :param filters: 筛选参数
        :return: 商品列表和元信息
        """
        all_items = []
        meta_info = {
            "keyword": keyword,
            "country": country,
            "total_items": 0,
            "total_pages": 0,
            "filters": filters,
            "fetch_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "currency": self.currency_codes.get(country, "Unknown")
        }
        page = 1
        
        while page <= max_pages:
            logging.info(f"正在获取关键词 '{keyword}' 第 {page}/{max_pages} 页搜索结果...")
            result = self.search_items(keyword, country, page, page_size, filters)
            
            if not result:
                break
                
            # 提取商品数据
            items = result.get("items", [])
            if not items:
                logging.info("当前页无商品数据,停止获取")
                break
                
            all_items.extend(items)
            
            # 保存元信息(第一页)
            if page == 1:
                meta_info["total_items"] = result.get("total_items", 0)
                # 计算总页数,不超过max_pages
                meta_info["total_pages"] = min(max_pages, 
                                              (meta_info["total_items"] + page_size - 1) // page_size)
                meta_info["filters_applied"] = result.get("filters_applied", {})
            
            page += 1
        
        logging.info(f"批量搜索完成,共获取 {len(all_items)} 个商品数据")
        return all_items, meta_info
    
    def analyze_search_results(self, items: List[Dict], meta_info: Dict) -> Dict:
        """分析搜索结果数据"""
        if not items:
            return {"error": "没有商品数据可分析"}
        
        country = meta_info.get("country", "")
        currency = self.currency_codes.get(country, "Unknown")
        
        # 1. 价格分析
        price_info = {
            "min_price": None,
            "max_price": None,
            "avg_price": None,
            "median_price": None,
            "currency": currency,
            "price_distribution": defaultdict(int)
        }
        
        prices = []
        for item in items:
            try:
                price = float(item.get("price", 0))
                prices.append(price)
                
                # 价格区间分布
                if price < 50:
                    price_info["price_distribution"]["<50"] += 1
                elif price < 100:
                    price_info["price_distribution"]["50-100"] += 1
                elif price < 200:
                    price_info["price_distribution"]["100-200"] += 1
                elif price < 500:
                    price_info["price_distribution"]["200-500"] += 1
                else:
                    price_info["price_distribution"][">500"] += 1
            except (ValueError, TypeError):
                continue
        
        if prices:
            price_info["min_price"] = round(min(prices), 2)
            price_info["max_price"] = round(max(prices), 2)
            price_info["avg_price"] = round(sum(prices)/len(prices), 2)
            price_info["median_price"] = round(np.median(prices), 2)
            price_info["price_distribution"] = dict(price_info["price_distribution"])
        
        # 2. 品牌分析
        brand_info = {
            "total_brands": 0,
            "brand_distribution": defaultdict(int),
            "top_brands": []
        }
        
        for item in items:
            brand = item.get("brand_name", "未知品牌")
            brand_info["brand_distribution"][brand] += 1
        
        brand_info["total_brands"] = len(brand_info["brand_distribution"])
        # 获取前10个品牌
        brand_info["top_brands"] = sorted(
            brand_info["brand_distribution"].items(), 
            key=lambda x: x[1], 
            reverse=True
        )[:10]
        brand_info["brand_distribution"] = dict(brand_info["brand_distribution"])
        
        # 3. 销量分析
        sales_info = {
            "total_sales": 0,
            "avg_sales": 0,
            "sales_distribution": defaultdict(int)
        }
        
        sales_counts = []
        for item in items:
            try:
                sales = int(item.get("sales_count", 0))
                sales_counts.append(sales)
                sales_info["total_sales"] += sales
                
                # 销量区间分布
                if sales == 0:
                    sales_info["sales_distribution"]["0"] += 1
                elif sales < 10:
                    sales_info["sales_distribution"]["1-9"] += 1
                elif sales < 100:
                    sales_info["sales_distribution"]["10-99"] += 1
                elif sales < 500:
                    sales_info["sales_distribution"]["100-499"] += 1
                else:
                    sales_info["sales_distribution"][">=500"] += 1
            except (ValueError, TypeError):
                continue
        
        if sales_counts:
            sales_info["avg_sales"] = round(sum(sales_counts)/len(sales_counts), 1)
        sales_info["sales_distribution"] = dict(sales_info["sales_distribution"])
        
        # 4. 评分分析
        rating_info = {
            "avg_rating": 0,
            "rating_distribution": defaultdict(int),
            "no_rating_count": 0
        }
        
        ratings = []
        for item in items:
            try:
                rating = float(item.get("rating_score", 0))
                if rating > 0:
                    ratings.append(rating)
                    # 评分分布(0.5星为间隔)
                    rating_bin = f"{round(rating * 2) / 2:.1f}"
                    rating_info["rating_distribution"][rating_bin] += 1
                else:
                    rating_info["no_rating_count"] += 1
            except (ValueError, TypeError):
                rating_info["no_rating_count"] += 1
        
        if ratings:
            rating_info["avg_rating"] = round(sum(ratings)/len(ratings), 1)
        rating_info["rating_distribution"] = dict(rating_info["rating_distribution"])
        
        # 5. 促销分析
        promotion_info = {
            "has_promotion_count": 0,
            "promotion_ratio": 0,
            "avg_discount": 0
        }
        
        discounts = []
        for item in items:
            if item.get("has_promotion", False):
                promotion_info["has_promotion_count"] += 1
            
            try:
                price = float(item.get("price", 0))
                original_price = float(item.get("original_price", 0))
                if original_price > 0 and price < original_price:
                    discount = round((1 - price/original_price) * 100, 1)
                    discounts.append(discount)
            except (ValueError, TypeError):
                continue
        
        if items:
            promotion_info["promotion_ratio"] = round(
                promotion_info["has_promotion_count"] / len(items) * 100, 1
            )
        
        if discounts:
            promotion_info["avg_discount"] = round(sum(discounts)/len(discounts), 1)
        
        # 6. 卖家类型分析
        seller_info = {
            "official_count": 0,
            "official_ratio": 0,
            "top_sellers": defaultdict(int)
        }
        
        for item in items:
            if item.get("is_official", False):
                seller_info["official_count"] += 1
            
            seller_id = item.get("seller_id", "未知卖家")
            seller_info["top_sellers"][seller_id] += 1
        
        if items:
            seller_info["official_ratio"] = round(
                seller_info["official_count"] / len(items) * 100, 1
            )
        
        # 获取前5名卖家
        seller_info["top_sellers"] = sorted(
            seller_info["top_sellers"].items(), 
            key=lambda x: x[1], 
            reverse=True
        )[:5]
        seller_info["top_sellers"] = [(s if s != "未知卖家" else s, c) for s, c in seller_info["top_sellers"]]
        
        return {
            "search_keyword": meta_info.get("keyword"),
            "country": country,
            "total_items": len(items),
            "price_analysis": price_info,
            "brand_analysis": brand_info,
            "sales_analysis": sales_info,
            "rating_analysis": rating_info,
            "promotion_analysis": promotion_info,
            "seller_analysis": seller_info,
            "analysis_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }
    
    def get_top_items(self, items: List[Dict], by: str = "sales", top_n: int = 10) -> List[Dict]:
        """
        获取特定维度排名靠前的商品
        :param items: 商品列表
        :param by: 排序维度:sales(销量), price(价格), rating(评分)
        :param top_n: 取前N名
        :return: 排序后的商品列表
        """
        if not items:
            return []
        
        valid_dimensions = ["sales", "price", "rating"]
        if by not in valid_dimensions:
            logging.warning(f"无效的排序维度: {by},使用默认值 'sales'")
            by = "sales"
        
        # 准备排序数据
        ranked_items = []
        for item in items:
            try:
                if by == "sales":
                    value = int(item.get("sales_count", 0))
                    # 销量排序是降序
                    sort_value = -value
                elif by == "price":
                    # 价格排序是升序
                    value = float(item.get("price", 0))
                    sort_value = value
                elif by == "rating":
                    # 评分排序是降序
                    value = float(item.get("rating_score", 0))
                    sort_value = -value
                
                ranked_items.append((item, sort_value, value))
            except (ValueError, TypeError):
                continue
        
        # 排序并返回前N名
        ranked_items.sort(key=lambda x: x[1])
        return [{"item": p[0], "value": p[2]} for p in ranked_items[:top_n]]
    
    def visualize_analysis(self, analysis: Dict, output_dir: str = ".") -> None:
        """可视化分析结果"""
        if "error" in analysis:
            logging.warning(analysis["error"])
            return
        
        keyword = analysis.get("search_keyword", "未知关键词")
        country = analysis.get("country", "未知站点")
        
        # 1. 价格分布直方图
        if "price_analysis" in analysis and analysis["price_analysis"]["price_distribution"]:
            plt.figure(figsize=(10, 6))
            price_bins = sorted(analysis["price_analysis"]["price_distribution"].keys(),
                              key=lambda x: float(re.findall(r'\d+', x)[0]) if re.findall(r'\d+', x) else 0)
            counts = [analysis["price_analysis"]["price_distribution"][b] for b in price_bins]
            
            plt.bar(price_bins, counts, color='lightblue')
            plt.title(f'关键词 "{keyword}" ({country}) 价格分布')
            plt.xlabel(f'价格区间 ({analysis["price_analysis"]["currency"]})')
            plt.ylabel('商品数量')
            
            for i, v in enumerate(counts):
                plt.text(i, v + 0.5, str(v), ha='center')
            
            plt.tight_layout()
            plt.savefig(f"{output_dir}/price_distribution.png")
            plt.close()
            logging.info(f"价格分布图表已保存至 {output_dir}/price_distribution.png")
        
        # 2. 品牌分布饼图(前5名+其他)
        if "brand_analysis" in analysis and analysis["brand_analysis"]["brand_distribution"]:
            plt.figure(figsize=(10, 8))
            brand_data = sorted(analysis["brand_analysis"]["brand_distribution"].items(), 
                              key=lambda x: x[1], reverse=True)
            
            if len(brand_data) > 5:
                top5 = brand_data[:5]
                others = sum(count for _, count in brand_data[5:])
                top5.append(("其他品牌", others))
                brand_data = top5
            
            labels, sizes = zip(*brand_data)
            plt.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=90)
            plt.title(f'关键词 "{keyword}" ({country}) 品牌分布')
            plt.axis('equal')
            plt.tight_layout()
            plt.savefig(f"{output_dir}/brand_distribution.png")
            plt.close()
            logging.info(f"品牌分布图表已保存至 {output_dir}/brand_distribution.png")
        
        # 3. 销量分布条形图
        if "sales_analysis" in analysis and analysis["sales_analysis"]["sales_distribution"]:
            plt.figure(figsize=(10, 6))
            sales_bins = sorted(analysis["sales_analysis"]["sales_distribution"].keys(),
                              key=lambda x: int(re.findall(r'\d+', x)[0]) if re.findall(r'\d+', x) else -1)
            counts = [analysis["sales_analysis"]["sales_distribution"][b] for b in sales_bins]
            
            plt.bar(sales_bins, counts, color='orange')
            plt.title(f'关键词 "{keyword}" ({country}) 销量分布')
            plt.xlabel('销量区间')
            plt.ylabel('商品数量')
            
            for i, v in enumerate(counts):
                plt.text(i, v + 0.5, str(v), ha='center')
            
            plt.tight_layout()
            plt.savefig(f"{output_dir}/sales_distribution.png")
            plt.close()
            logging.info(f"销量分布图表已保存至 {output_dir}/sales_distribution.png")
        
        # 4. 评分分布条形图
        if "rating_analysis" in analysis and analysis["rating_analysis"]["rating_distribution"]:
            plt.figure(figsize=(10, 6))
            rating_bins = sorted(analysis["rating_analysis"]["rating_distribution"].keys(),
                               key=lambda x: float(x))
            counts = [analysis["rating_analysis"]["rating_distribution"][b] for b in rating_bins]
            
            # 添加无评分数据
            if analysis["rating_analysis"]["no_rating_count"] > 0:
                rating_bins.append("无评分")
                counts.append(analysis["rating_analysis"]["no_rating_count"])
            
            plt.bar(rating_bins, counts, color='lightgreen')
            plt.title(f'关键词 "{keyword}" ({country}) 评分分布')
            plt.xlabel('评分 (星)')
            plt.ylabel('商品数量')
            plt.axhline(y=sum(counts)/len(counts), color='r', linestyle='--', 
                       label=f'平均分布: {sum(counts)/len(counts):.1f}')
            
            for i, v in enumerate(counts):
                plt.text(i, v + 0.5, str(v), ha='center')
            
            plt.legend()
            plt.tight_layout()
            plt.savefig(f"{output_dir}/rating_distribution.png")
            plt.close()
            logging.info(f"评分分布图表已保存至 {output_dir}/rating_distribution.png")
        
        # 5. 促销情况饼图
        if "promotion_analysis" in analysis:
            plt.figure(figsize=(8, 8))
            labels = ['有促销', '无促销']
            sizes = [
                analysis["promotion_analysis"]["has_promotion_count"],
                analysis["total_items"] - analysis["promotion_analysis"]["has_promotion_count"]
            ]
            
            plt.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=90)
            plt.title(f'关键词 "{keyword}" ({country}) 促销情况')
            plt.axis('equal')
            plt.tight_layout()
            plt.savefig(f"{output_dir}/promotion_distribution.png")
            plt.close()
            logging.info(f"促销情况图表已保存至 {output_dir}/promotion_distribution.png")
    
    def export_to_excel(self, items: List[Dict], analysis: Dict, meta_info: Dict, 
                       filename: str) -> None:
        """导出搜索结果和分析到Excel"""
        if not items and not analysis:
            logging.warning("没有数据可导出")
            return
            
        try:
            with pd pd.ExcelWriter(filename) as writer:
                # 搜索信息
                pd.DataFrame([meta_info]).to_excel(writer, sheet_name='搜索信息', index=False)
                
                # 商品列表
                if items:
                    item_list = []
                    for item in items:
                        item_data = {
                            "商品ID": item.get("item_id"),
                            "标题": item.get("title"),
                            "品牌": item.get("brand_name"),
                            "价格": item.get("price"),
                            "原价": item.get("original_price"),
                            "折扣(%)": round((1 - float(item.get("price", 0))/float(item.get("original_price", 1))) * 100, 1) 
                                      if float(item.get("original_price", 0)) > 0 else 0,
                            "销量": item.get("sales_count"),
                            "评分": item.get("rating_score"),
                            "评论数": item.get("review_count"),
                            "卖家ID": item.get("seller_id"),
                            "卖家名称": item.get("seller_name"),
                            "是否官方店铺": "是" if item.get("is_official", False) else "否",
                            "是否有促销": "是" if item.get("has_promotion", False) else "否"
                        }
                        item_list.append(item_data)
                    
                    df_items = pd.DataFrame(item_list)
                    df_items.to_excel(writer, sheet_name='商品列表', index=False)
                
                # 分析结果
                if analysis and "error" not in analysis:
                    # 价格分析
                    df_price = pd.DataFrame([analysis["price_analysis"]])
                    df_price.to_excel(writer, sheet_name='价格分析', index=False)
                    
                    # 品牌分析
                    df_brand = pd.DataFrame(list(analysis["brand_analysis"]["brand_distribution"].items()),
                                          columns=['品牌', '数量'])
                    df_brand.to_excel(writer, sheet_name='品牌分析', index=False)
                    
                    # 销量分析
                    df_sales = pd.DataFrame(list(analysis["sales_analysis"]["sales_distribution"].items()),
                                          columns=['销量区间', '数量'])
                    df_sales.to_excel(writer, sheet_name='销量分析', index=False)
            
            logging.info(f"数据已导出至 {filename}")
        except Exception as e:
            logging.error(f"导出Excel失败: {e}")


# 示例调用
if __name__ == "__main__":
    # 替换为实际的参数(从Lazada开放平台获取)
    APP_KEY = "your_app_key"
    APP_SECRET = "your_app_secret"
    KEYWORD = "wireless earbuds"  # 搜索关键词
    COUNTRY = "SG"  # 新加坡站点
    
    # 初始化API客户端
    lazada_search = LazadaItemSearch(APP_KEY, APP_SECRET)
    # 若为高级权限,设置更高的频率限制
    # lazada_search.set_rate_limit(300)
    
    # 1. 设置搜索筛选条件
    filters = {
        "sort": "sales",          # 按销量排序
        "price_from": 50,         # 最低价格
        "price_to": 300,          # 最高价格
        # "brand": "Apple,Samsung", # 品牌筛选
        "has_promotion": False    # 不限制促销
    }
    
    # 2. 批量获取搜索结果
    print("=== 搜索商品 ===")
    items, meta_info = lazada_search.batch_search_items(
        keyword=KEYWORD,
        country=COUNTRY,
        max_pages=3,       # 获取前3页
        page_size=40,
        filters=filters
    )
    
    if items:
        print(f"搜索关键词: {KEYWORD}")
        print(f"站点: {COUNTRY}")
        print(f"获取商品数量: {len(items)}")
        print(f"总商品数量: {meta_info['total_items']}")
        print(f"货币单位: {meta_info['currency']}")
    
    # 3. 分析搜索结果
    print("\n=== 搜索结果分析 ===")
    if items:
        analysis = lazada_search.analyze_search_results(items, meta_info)
        
        print("价格分析:")
        print(f"  价格范围: {analysis['price_analysis']['min_price']} - {analysis['price_analysis']['max_price']} {analysis['price_analysis']['currency']}")
        print(f"  平均价格: {analysis['price_analysis']['avg_price']} {analysis['price_analysis']['currency']}")
        
        print("\n品牌分析:")
        print(f"  品牌总数: {analysis['brand_analysis']['total_brands']}")
        print("  主要品牌:")
        for brand, count in analysis["brand_analysis"]["top_brands"][:5]:
            print(f"    {brand}: {count}个商品")
        
        print("\n销售分析:")
        print(f"  总销量: {analysis['sales_analysis']['total_sales']}")
        print(f"  平均销量: {analysis['sales_analysis']['avg_sales']}")
        
        print("\n评分分析:")
        print(f"  平均评分: {analysis['rating_analysis']['avg_rating']}")
        print(f"  无评分商品: {analysis['rating_analysis']['no_rating_count']}")
        
        print("\n促销分析:")
        print(f"  有促销商品比例: {analysis['promotion_analysis']['promotion_ratio']}%")
        print(f"  平均折扣: {analysis['promotion_analysis']['avg_discount']}%")
        
        # 4. 获取各维度TOP商品
        print("\n=== 各维度TOP3商品 ===")
        top_sales = lazada_search.get_top_items(items, by="sales", top_n=3)
        print("销量TOP3:")
        for i, item_info in enumerate(top_sales, 1):
            item = item_info["item"]
            print(f"{i}. {item['title'][:30]}... 销量: {item_info['value']}")
        
        top_rating = lazada_search.get_top_items(items, by="rating", top_n=3)
        print("\n评分TOP3:")
        for i, item_info in enumerate(top_rating, 1):
            item = item_info["item"]
            print(f"{i}. {item['title'][:30]}... 评分: {item_info['value']}")
        
        top_cheap = lazada_search.get_top_items(items, by="price", top_n=3)
        print("\n价格最低TOP3:")
        for i, item_info in enumerate(top_cheap, 1):
            item = item_info["item"]
            print(f"{i}. {item['title'][:30]}... 价格: {item_info['value']} {meta_info['currency']}")
        
        # 5. 可视化分析结果
        lazada_search.visualize_analysis(analysis)
        
        # 6. 导出数据到Excel
        lazada_search.export_to_excel(items, analysis, meta_info, f"Lazada搜索_{KEYWORD.replace(' ', '_')}_分析.xlsx")
    else:
        print("未获取到搜索结果,无法进行分析")

四、接口调用注意事项

1. 常见错误及解决方案

错误码说明解决方案
10001无效的 app_key检查 app_key 是否正确,应用是否已通过审核
10002签名错误检查签名算法实现,确保参数排序正确、app_secret 无误
10003权限不足申请更高权限或检查是否已开通 item_search 接口权限
10004调用频率超限降低调用频率,或申请提高权限等级
20002站点代码无效确保 country 参数为支持的站点代码(SG、MY、TH 等)
20003参数错误检查 page_size 是否在 10-40 之间,sort 是否为支持的类型
20005关键词无效检查关键词是否为空或包含特殊字符,进行适当编码
30001搜索服务暂时不可用稍后重试,可能是平台服务临时故障
40001访问被拒绝检查开发者账号状态,是否有违规行为导致限制

2. 性能优化建议

  • 精准筛选:使用 price_from、price_to、brand 等筛选参数减少返回数据量

  • 合理设置分页:使用最大 page_size(40)减少请求次数,降低频率限制影响

  • 缓存策略:对相同关键词和筛选条件的搜索结果设置 15-30 分钟缓存

  • 批量处理控制:多关键词搜索时,使用队列控制并发,避免触发频率限制

  • 关键词优化:使用精准关键词减少无关结果,提高数据质量

  • 增量搜索:定期搜索时,记录上次搜索的商品 ID,只关注新增或变化的商品

  • 异常处理:实现失败重试机制,特别是针对临时服务不可用的情况

五、应用场景与扩展建议

典型应用场景

  • 市场趋势分析:通过关键词搜索分析特定品类商品的市场状况和趋势

  • 竞品监控:跟踪竞争对手产品的价格、销量和促销情况

  • 新品开发:分析热门商品特征,指导新产品开发和设计

  • 价格策略制定:了解市场价格区间和分布,优化自身定价策略

  • 品牌分析:评估特定品类中各品牌的市场占有率和表现

  • 关键词研究:分析不同关键词的搜索结果,优化商品标题和关键词设置

扩展建议

  • 多关键词对比:同时分析多个相关关键词的搜索结果,全面了解市场

  • 多站点对比:分析同一关键词在不同东南亚国家的搜索结果差异

  • 趋势追踪:定期执行相同搜索,跟踪市场变化和趋势

  • 价格带分析:深入分析不同价格带商品的特征和市场表现

  • 关键词扩展:基于初始搜索结果,自动生成相关关键词并进行扩展搜索

  • 图像识别:结合商品图片分析,识别热门产品特征和设计元素

  • 搜索排名监控:追踪特定商品在不同关键词搜索结果中的排名变化

  • 智能推荐:基于搜索分析结果,自动推荐有潜力的商品或市场机会

通过item_search接口获取的 Lazada 搜索结果数据,能够帮助跨境电商企业全面了解特定关键词相关的商品市场状况,为产品开发、定价策略和市场推广提供数据支持。在实际应用中,需结合东南亚不同国家的市场特点和消费习惯,制定针对性的运营策略,同时严格遵守 Lazada 平台的数据使用规范和各国家的数据保护法规。


群贤毕至

访客