×

VVICitem_search - 根据关键词取关键词取商品列表接口深度分析及 Python 实现

万邦科技Lex 万邦科技Lex 发表于2025-09-23 16:07:50 浏览228 评论0

抢沙发发表评论

     注册账号免费测试VVICAPI数据接口

VVIC(搜款网)的item_search接口是根据平台根据关键词搜索商品列表的核心接口,为服装批发电商从业者提供了高效获取特定品类商品信息的能力。通过该接口,开发者可以获取符合搜索条件的商品列表,包含价格、销量、供应商等关键数据,为市场调研、竞品分析和采购决策提供重要依据。

一、VVICitem_search 接口核心特性分析

1. 接口定位与核心价值

VVICitem_search接口专注于基于关键词的商品搜索功能,其核心价值体现在:
  • 针对服装服饰类商品的精准搜索,支持风格、材质等行业特色筛选

  • 提供批发价格、起订量等采购关键信息

  • 支持多维度排序,快速定位热门或优质商品

  • 可批量获取商品列表,高效进行市场分析

  • 包含供应商信息,辅助采购渠道评估

2. 接口权限与调用限制

使用 VVIC 搜索接口需遵守平台规范:
限制类型具体规则说明
权限要求需注册 VVIC 开放平台账号,创建应用并通过认证个人开发者可获取基础信息,企业开发者有更高权限
调用频率基础权限:60 次 / 分钟;高级权限:200 次 / 分钟按 app_key 和 IP 地址双重限制
数据返回单次最多返回 50 条商品,最多支持 100 页数据搜索结果有 15 分钟缓存
字段限制基础字段(ID、标题、价格等)免费;高级字段需申请部分供应商信息受保护
搜索限制关键词长度限制为 2-30 个字符支持中英文及部分特殊符号

3. 核心参数解析

必选参数

参数名类型说明示例
app_keyString应用唯一标识"vvic_appkey_12345"
signString签名,按平台算法生成见下文签名逻辑
timestampLong时间戳(毫秒级)1690000000000
qString搜索关键词"连衣裙 夏季"

可选参数

参数名类型说明示例
pageInteger页码,默认 11
page_sizeInteger每页条数,10-5050
sortString排序方式"sales_desc"(销量降序)、"price_asc"(价格升序)等
price_fromFloat最低价格50
price_toFloat最高价格200
category_idString类目 ID 筛选"123"
materialString材质筛选"棉""涤纶"
styleString风格筛选"韩版""欧美"
wholesaleBoolean是否只看支持批发的商品true
has_stockBoolean是否只看有库存的商品true
seller_typeString卖家类型"manufacturer"(厂家)、"trader"(贸易商)

二、签名生成与返回数据结构

1. 签名生成逻辑

VVIC item_search接口采用与其他接口一致的 MD5 签名算法,步骤如下:
  1. 收集所有请求参数(不包含sign),按参数名 ASCII 码升序排序

  2. 拼接为key=value&key=value格式字符串(如"app_key=test&q=连衣裙"

  3. 在拼接字符串末尾添加&secret=your_app_secret

  4. 对整个字符串进行 MD5 加密,得到 32 位小写字符串,即为签名值

示例:参数:app_key=test&q=连衣裙&page=1×tamp=1690000000000拼接 secret 后:app_key=test&page=1&q=连衣裙×tamp=1690000000000&secret=abc123MD5 加密后得到签名:a1b2c3d4e5f6a7b8c9d0e1f2a3b4c5d6

2. 返回数据结构解析

接口返回 JSON 格式数据,核心结构包括:
  1. 响应状态信息:请求状态码、消息、请求 ID

  2. 搜索结果统计:总商品数、总页数、当前页

  3. 商品列表:符合搜索条件的商品集合

  4. 筛选条件:可用的筛选选项(价格区间、材质等)

商品列表中的关键数据字段:
  • item_id:商品唯一标识

  • title:商品标题

  • main_image:商品主图 URL

  • price:批发价格

  • market_price:市场参考价

  • min_buy:最小起订量

  • sales_30d:30 天销量

  • stock:库存数量

  • category:商品类目

  • brand:品牌名称

  • seller_id:卖家 ID

  • seller_name:卖家名称

  • seller_type:卖家类型

  • tags:商品标签(如新款、热卖等)

  • attributes:关键属性(材质、风格等)

三、Python 实现方案

以下是 VVIC item_search接口的完整 Python 实现,包含接口调用、数据处理及分析功能:
import requests
import time
import hashlib
import json
import logging
import pandas as pd
import matplotlib.pyplot as plt
import re
from datetime import datetime
from collections import defaultdict
from typing import Dict, Optional, List, Tuple

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)

# 配置中文显示
plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"]
plt.rcParams["axes.unicode_minus"] = False

class VVICItemSearch:
    """VVIC item_search接口封装类,用于根据关键词搜索商品并分析结果"""
    
    def __init__(self, app_key: str, app_secret: str):
        """
        初始化VVIC API客户端
        :param app_key: 应用的app_key
        :param app_secret: 应用的app_secret
        """
        self.app_key = app_key
        self.app_secret = app_secret
        self.api_url = "https://api.vvic.com/rest/item/search"
        
        # 频率控制
        self.rate_limit = 60  # 默认基础权限,高级权限可修改为200次/分钟
        self.call_timestamps = []  # 存储调用时间戳(毫秒级)
        
        # 支持的排序方式
        self.supported_sorts = [
            "default",          # 默认排序
            "sales_desc",       # 销量降序
            "price_asc",        # 价格升序
            "price_desc",       # 价格降序
            "newest",           # 最新上架
            "rating_desc",      # 评分降序
            "wholesale_desc"    # 批发优先
        ]
    
    def set_rate_limit(self, limit: int) -> None:
        """设置调用频率限制(次/分钟)"""
        if 60 <= limit <= 200:
            self.rate_limit = limit
            logging.info(f"已设置调用频率限制为 {limit} 次/分钟")
        else:
            logging.warning("频率限制必须在60-200之间,未修改")
    
    def _generate_sign(self, params: Dict) -> str:
        """生成签名(MD5算法)"""
        # 1. 按参数名ASCII升序排序
        sorted_params = sorted(params.items(), key=lambda x: x[0])
        
        # 2. 拼接为"key=value&key=value"格式
        param_str = "&".join([f"{k}={v}" for k, v in sorted_params])
        
        # 3. 添加secret
        param_str += f"&secret={self.app_secret}"
        
        # 4. MD5加密
        md5 = hashlib.md5()
        md5.update(param_str.encode('utf-8'))
        return md5.hexdigest()
    
    def _check_rate_limit(self) -> None:
        """检查并控制调用频率"""
        current_time = time.time() * 1000  # 毫秒级
        # 保留1分钟内的调用记录
        self.call_timestamps = [t for t in self.call_timestamps if current_time - t < 60000]
        
        # 若超过限制,计算需要等待的时间
        if len(self.call_timestamps) >= self.rate_limit:
            oldest_time = self.call_timestamps[0]
            sleep_time = (60000 - (current_time - oldest_time)) / 1000 + 0.1  # 额外加0.1秒保险
            logging.warning(f"调用频率超限,等待 {sleep_time:.1f} 秒")
            time.sleep(sleep_time)
            # 再次清理过期记录
            self.call_timestamps = [t for t in self.call_timestamps if time.time()*1000 - t < 60000]
        
        # 记录本次调用时间
        self.call_timestamps.append(current_time)
    
    def search_items(self, keyword: str, 
                    page: int = 1, page_size: int = 50, 
                    filters: Optional[Dict] = None) -> Optional[Dict]:
        """
        按关键词搜索商品
        :param keyword: 搜索关键词
        :param page: 页码
        :param page_size: 每页数量
        :param filters: 筛选参数
        :return: 搜索结果数据
        """
        # 验证关键词
        if not (2 <= len(keyword) <= 30):
            logging.error(f"关键词长度必须在2-30个字符之间,当前长度: {len(keyword)}")
            return None
        
        # 验证排序方式
        if filters and "sort" in filters and filters["sort"] not in self.supported_sorts:
            logging.error(f"不支持的排序方式: {filters['sort']},支持的方式: {', '.join(self.supported_sorts)}")
            return None
        
        # 验证分页大小
        if not (10 <= page_size <= 50):
            logging.error(f"page_size必须在10-50之间,当前值: {page_size}")
            return None
        
        # 构建基础参数
        base_params = {
            "app_key": self.app_key,
            "timestamp": str(int(time.time() * 1000)),  # 毫秒级时间戳
            "format": "json",
            "v": "1.0",
            "q": keyword,
            "page": str(page),
            "page_size": str(page_size)
        }
        
        # 合并筛选参数
        if filters and isinstance(filters, Dict):
            # 过滤空值参数
            valid_filters = {k: v for k, v in filters.items() if v is not None}
            # 转换为字符串类型
            for k, v in valid_filters.items():
                base_params[k] = str(v).lower() if isinstance(v, bool) else str(v)
        
        # 生成签名
        sign = self._generate_sign(base_params)
        base_params["sign"] = sign
        
        # 检查频率限制
        self._check_rate_limit()
        
        try:
            # 发送请求
            response = requests.get(self.api_url, params=base_params, timeout=10)
            response.raise_for_status()
            
            # 解析响应
            result = response.json()
            
            # 处理错误
            if result.get("code") != 0:
                logging.error(f"API调用错误: {result.get('msg')} (错误码: {result.get('code')})")
                return None
                
            # 提取结果
            search_data = result.get("data", {})
            if not search_data.get("items"):
                logging.warning("未获取到搜索结果")
                return None
                
            logging.info(f"成功搜索关键词 '{keyword}',第 {page} 页,找到 {search_data.get('total_items', 0)} 个商品")
            return search_data
            
        except requests.exceptions.RequestException as e:
            logging.error(f"请求异常: {str(e)}")
            return None
        except json.JSONDecodeError:
            logging.error(f"响应解析失败: {response.text[:200]}...")
            return None
    
    def batch_search_items(self, keyword: str, 
                         max_pages: int = 5, page_size: int = 50, 
                         filters: Optional[Dict] = None) -> Tuple[List[Dict], Dict]:
        """
        批量获取多页搜索结果
        :param keyword: 搜索关键词
        :param max_pages: 最大页数
        :param page_size: 每页数量
        :param filters: 筛选参数
        :return: 商品列表和元信息
        """
        all_items = []
        meta_info = {
            "keyword": keyword,
            "total_items": 0,
            "total_pages": 0,
            "filters": filters,
            "fetch_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }
        page = 1
        
        while page <= max_pages:
            logging.info(f"正在获取关键词 '{keyword}' 第 {page}/{max_pages} 页搜索结果...")
            result = self.search_items(keyword, page, page_size, filters)
            
            if not result:
                break
                
            # 提取商品数据
            items = result.get("items", [])
            if not items:
                logging.info("当前页无商品数据,停止获取")
                break
                
            all_items.extend(items)
            
            # 保存元信息(第一页)
            if page == 1:
                meta_info["total_items"] = result.get("total_items", 0)
                # 计算总页数,不超过max_pages
                meta_info["total_pages"] = min(max_pages, 
                                              (meta_info["total_items"] + page_size - 1) // page_size)
                meta_info["available_filters"] = result.get("filters", {})
            
            page += 1
        
        logging.info(f"批量搜索完成,共获取 {len(all_items)} 个商品数据")
        return all_items, meta_info
    
    def analyze_search_results(self, items: List[Dict], meta_info: Dict) -> Dict:
        """分析搜索结果数据"""
        if not items:
            return {"error": "没有商品数据可分析"}
        
        keyword = meta_info.get("keyword", "未知关键词")
        
        # 1. 价格分析
        price_info = {
            "min_price": None,
            "max_price": None,
            "avg_price": None,
            "median_price": None,
            "price_distribution": defaultdict(int)
        }
        
        prices = []
        for item in items:
            try:
                price = float(item.get("price", 0))
                prices.append(price)
                
                # 价格区间分布
                if price < 50:
                    price_info["price_distribution"]["<50"] += 1
                elif price < 100:
                    price_info["price_distribution"]["50-100"] += 1
                elif price < 200:
                    price_info["price_distribution"]["100-200"] += 1
                elif price < 500:
                    price_info["price_distribution"]["200-500"] += 1
                else:
                    price_info["price_distribution"][">500"] += 1
            except (ValueError, TypeError):
                continue
        
        if prices:
            price_info["min_price"] = round(min(prices), 2)
            price_info["max_price"] = round(max(prices), 2)
            price_info["avg_price"] = round(sum(prices)/len(prices), 2)
            # 计算中位数
            sorted_prices = sorted(prices)
            n = len(sorted_prices)
            if n % 2 == 1:
                price_info["median_price"] = round(sorted_prices[n//2], 2)
            else:
                price_info["median_price"] = round((sorted_prices[n//2-1] + sorted_prices[n//2])/2, 2)
            price_info["price_distribution"] = dict(price_info["price_distribution"])
        
        # 2. 类目分析
        category_info = {
            "total_categories": 0,
            "category_distribution": defaultdict(int),
            "top_categories": []
        }
        
        for item in items:
            category = item.get("category", "未知类目")
            # 只取末级类目
            if ">" in category:
                category = category.split(">")[-1]
            category_info["category_distribution"][category] += 1
        
        category_info["total_categories"] = len(category_info["category_distribution"])
        # 获取前10个类目
        category_info["top_categories"] = sorted(
            category_info["category_distribution"].items(), 
            key=lambda x: x[1], 
            reverse=True
        )[:10]
        category_info["category_distribution"] = dict(category_info["category_distribution"])
        
        # 3. 销量分析
        sales_info = {
            "total_sales": 0,
            "avg_sales": 0,
            "sales_distribution": defaultdict(int),
            "zero_sales_count": 0
        }
        
        sales_counts = []
        for item in items:
            try:
                sales = int(item.get("sales_30d", 0))
                sales_counts.append(sales)
                sales_info["total_sales"] += sales
                
                # 销量区间分布
                if sales == 0:
                    sales_info["zero_sales_count"] += 1
                    sales_info["sales_distribution"]["0"] += 1
                elif sales < 10:
                    sales_info["sales_distribution"]["1-9"] += 1
                elif sales < 50:
                    sales_info["sales_distribution"]["10-49"] += 1
                elif sales < 100:
                    sales_info["sales_distribution"]["50-99"] += 1
                elif sales < 500:
                    sales_info["sales_distribution"]["100-499"] += 1
                else:
                    sales_info["sales_distribution"][">=500"] += 1
            except (ValueError, TypeError):
                sales_info["zero_sales_count"] += 1
                sales_info["sales_distribution"]["0"] += 1
        
        if sales_counts:
            sales_info["avg_sales"] = round(sum(sales_counts)/len(sales_counts), 1)
        sales_info["sales_distribution"] = dict(sales_info["sales_distribution"])
        
        # 4. 供应商分析
        seller_info = {
            "total_sellers": 0,
            "seller_type_distribution": defaultdict(int),
            "top_sellers": defaultdict(int)
        }
        
        for item in items:
            seller_id = item.get("seller_id", "未知卖家")
            seller_info["top_sellers"][seller_id] = seller_info["top_sellers"].get(seller_id, 0) + 1
            
            seller_type = item.get("seller_type", "unknown")
            seller_info["seller_type_distribution"][seller_type] += 1
        
        seller_info["total_sellers"] = len(seller_info["top_sellers"])
        # 获取前5名卖家
        seller_info["top_sellers"] = sorted(
            seller_info["top_sellers"].items(), 
            key=lambda x: x[1], 
            reverse=True
        )[:5]
        seller_info["seller_type_distribution"] = dict(seller_info["seller_type_distribution"])
        
        # 5. 属性分析(材质、风格等)
        attribute_info = {
            "material_distribution": defaultdict(int),
            "style_distribution": defaultdict(int)
        }
        
        for item in items:
            attributes = item.get("attributes", [])
            if isinstance(attributes, list):
                for attr in attributes:
                    if attr.get("name") == "材质":
                        attribute_info["material_distribution"][attr.get("value", "未知")] += 1
                    elif attr.get("name") == "风格":
                        attribute_info["style_distribution"][attr.get("value", "未知")] += 1
        
        attribute_info["material_distribution"] = dict(attribute_info["material_distribution"])
        attribute_info["style_distribution"] = dict(attribute_info["style_distribution"])
        
        # 6. 起订量分析
        order_info = {
            "min_order": None,
            "max_order": None,
            "avg_order": None,
            "order_distribution": defaultdict(int)
        }
        
        order_quantities = []
        for item in items:
            try:
                min_buy = int(item.get("min_buy", 1))
                order_quantities.append(min_buy)
                
                # 起订量区间分布
                if min_buy == 1:
                    order_info["order_distribution"]["1件起批"] += 1
                elif min_buy <= 5:
                    order_info["order_distribution"]["2-5件起批"] += 1
                elif min_buy <= 10:
                    order_info["order_distribution"]["6-10件起批"] += 1
                else:
                    order_info["order_distribution"][">10件起批"] += 1
            except (ValueError, TypeError):
                continue
        
        if order_quantities:
            order_info["min_order"] = min(order_quantities)
            order_info["max_order"] = max(order_quantities)
            order_info["avg_order"] = round(sum(order_quantities)/len(order_quantities), 1)
        order_info["order_distribution"] = dict(order_info["order_distribution"])
        
        return {
            "search_keyword": keyword,
            "total_items": len(items),
            "price_analysis": price_info,
            "category_analysis": category_info,
            "sales_analysis": sales_info,
            "seller_analysis": seller_info,
            "attribute_analysis": attribute_info,
            "order_analysis": order_info,
            "analysis_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }
    
    def get_top_items(self, items: List[Dict], by: str = "sales", top_n: int = 10) -> List[Dict]:
        """
        获取特定维度排名靠前的商品
        :param items: 商品列表
        :param by: 排序维度:sales(销量), price(价格), rating(评分)
        :param top_n: 取前N名
        :return: 排序后的商品列表
        """
        if not items:
            return []
        
        valid_dimensions = ["sales", "price", "rating"]
        if by not in valid_dimensions:
            logging.warning(f"无效的排序维度: {by},使用默认值 'sales'")
            by = "sales"
        
        # 准备排序数据
        ranked_items = []
        for item in items:
            try:
                if by == "sales":
                    value = int(item.get("sales_30d", 0))
                    # 销量排序是降序
                    sort_value = -value
                elif by == "price":
                    # 价格排序是升序
                    value = float(item.get("price", 0))
                    sort_value = value
                elif by == "rating":
                    # 评分排序是降序
                    value = float(item.get("rating", 0))
                    sort_value = -value
                
                ranked_items.append((item, sort_value, value))
            except (ValueError, TypeError):
                continue
        
        # 排序并返回前N名
        ranked_items.sort(key=lambda x: x[1])
        return [{"item": p[0], "value": p[2]} for p in ranked_items[:top_n]]
    
    def visualize_analysis(self, analysis: Dict, output_dir: str = ".") -> None:
        """可视化分析结果"""
        if "error" in analysis:
            logging.warning(analysis["error"])
            return
        
        keyword = analysis.get("search_keyword", "未知关键词")
        
        # 1. 价格分布直方图
        if "price_analysis" in analysis and analysis["price_analysis"]["price_distribution"]:
            plt.figure(figsize=(10, 6))
            price_bins = sorted(analysis["price_analysis"]["price_distribution"].keys(),
                              key=lambda x: float(re.findall(r'\d+', x)[0]) if re.findall(r'\d+', x) else 0)
            counts = [analysis["price_analysis"]["price_distribution"][b] for b in price_bins]
            
            plt.bar(price_bins, counts, color='lightblue')
            plt.title(f'关键词 "{keyword}" 价格分布')
            plt.xlabel('价格区间 (元)')
            plt.ylabel('商品数量')
            
            for i, v in enumerate(counts):
                plt.text(i, v + 0.5, str(v), ha='center')
            
            plt.tight_layout()
            plt.savefig(f"{output_dir}/price_distribution.png")
            plt.close()
            logging.info(f"价格分布图表已保存至 {output_dir}/price_distribution.png")
        
        # 2. 类目分布饼图(前5名+其他)
        if "category_analysis" in analysis and analysis["category_analysis"]["category_distribution"]:
            plt.figure(figsize=(10, 8))
            category_data = sorted(analysis["category_analysis"]["category_distribution"].items(), 
                              key=lambda x: x[1], reverse=True)
            
            if len(category_data) > 5:
                top5 = category_data[:5]
                others = sum(count for _, count in category_data[5:])
                top5.append(("其他类目", others))
                category_data = top5
            
            labels, sizes = zip(*category_data)
            plt.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=90)
            plt.title(f'关键词 "{keyword}" 类目分布')
            plt.axis('equal')
            plt.tight_layout()
            plt.savefig(f"{output_dir}/category_distribution.png")
            plt.close()
            logging.info(f"类目分布图表已保存至 {output_dir}/category_distribution.png")
        
        # 3. 销量分布条形图
        if "sales_analysis" in analysis and analysis["sales_analysis"]["sales_distribution"]:
            plt.figure(figsize=(10, 6))
            sales_bins = sorted(analysis["sales_analysis"]["sales_distribution"].keys(),
                              key=lambda x: int(re.findall(r'\d+', x)[0]) if re.findall(r'\d+', x) else -1)
            counts = [analysis["sales_analysis"]["sales_distribution"][b] for b in sales_bins]
            
            plt.bar(sales_bins, counts, color='orange')
            plt.title(f'关键词 "{keyword}" 销量分布')
            plt.xlabel('30天销量区间')
            plt.ylabel('商品数量')
            
            for i, v in enumerate(counts):
                plt.text(i, v + 0.5, str(v), ha='center')
            
            plt.tight_layout()
            plt.savefig(f"{output_dir}/sales_distribution.png")
            plt.close()
            logging.info(f"销量分布图表已保存至 {output_dir}/sales_distribution.png")
        
        # 4. 供应商类型分布饼图
        if "seller_analysis" in analysis and analysis["seller_analysis"]["seller_type_distribution"]:
            plt.figure(figsize=(8, 8))
            seller_type_data = analysis["seller_analysis"]["seller_type_distribution"]
            # 转换键名以便理解
            label_mapping = {
                "manufacturer": "生产厂家",
                "trader": "贸易商",
                "brand": "品牌商",
                "unknown": "未知类型"
            }
            labels = [label_mapping.get(k, k) for k in seller_type_data.keys()]
            sizes = list(seller_type_data.values())
            
            plt.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=90)
            plt.title(f'关键词 "{keyword}" 供应商类型分布')
            plt.axis('equal')
            plt.tight_layout()
            plt.savefig(f"{output_dir}/seller_type_distribution.png")
            plt.close()
            logging.info(f"供应商类型分布图表已保存至 {output_dir}/seller_type_distribution.png")
        
        # 5. 材质分布条形图
        if "attribute_analysis" in analysis and analysis["attribute_analysis"]["material_distribution"]:
            plt.figure(figsize=(10, 6))
            material_data = sorted(analysis["attribute_analysis"]["material_distribution"].items(),
                                 key=lambda x: x[1], reverse=True)[:10]  # 只取前10
            
            if material_data:
                materials, counts = zip(*material_data)
                plt.bar(materials, counts, color='lightgreen')
                plt.title(f'关键词 "{keyword}" 材质分布')
                plt.xlabel('材质')
                plt.ylabel('商品数量')
                plt.xticks(rotation=45)
                
                for i, v in enumerate(counts):
                    plt.text(i, v + 0.5, str(v), ha='center')
                
                plt.tight_layout()
                plt.savefig(f"{output_dir}/material_distribution.png")
                plt.close()
                logging.info(f"材质分布图表已保存至 {output_dir}/material_distribution.png")
        
        # 6. 起订量分布饼图
        if "order_analysis" in analysis and analysis["order_analysis"]["order_distribution"]:
            plt.figure(figsize=(8, 8))
            order_data = analysis["order_analysis"]["order_distribution"]
            labels = list(order_data.keys())
            sizes = list(order_data.values())
            
            plt.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=90)
            plt.title(f'关键词 "{keyword}" 起订量分布')
            plt.axis('equal')
            plt.tight_layout()
            plt.savefig(f"{output_dir}/order_distribution.png")
            plt.close()
            logging.info(f"起订量分布图表已保存至 {output_dir}/order_distribution.png")
    
    def export_to_excel(self, items: List[Dict], analysis: Dict, meta_info: Dict, 
                       filename: str) -> None:
        """导出搜索结果和分析到Excel"""
        if not items and not analysis:
            logging.warning("没有数据可导出")
            return
            
        try:
            with pd.ExcelWriter(filename) as writer:
                # 搜索信息
                pd.DataFrame([meta_info]).to_excel(writer, sheet_name='搜索信息', index=False)
                
                # 商品列表
                if items:
                    item_list = []
                    for item in items:
                        item_data = {
                            "商品ID": item.get("item_id"),
                            "标题": item.get("title"),
                            "类目": item.get("category"),
                            "品牌": item.get("brand"),
                            "批发价(元)": item.get("price"),
                            "市场价(元)": item.get("market_price"),
                            "起订量": item.get("min_buy"),
                            "30天销量": item.get("sales_30d"),
                            "库存": item.get("stock"),
                            "卖家ID": item.get("seller_id"),
                            "卖家名称": item.get("seller_name"),
                            "卖家类型": item.get("seller_type"),
                            "标签": ",".join(item.get("tags", []))
                        }
                        item_list.append(item_data)
                    
                    df_items = pd.DataFrame(item_list)
                    df_items.to_excel(writer, sheet_name='商品列表', index=False)
                
                # 分析结果
                if analysis and "error" not in analysis:
                    # 价格分析
                    df_price = pd.DataFrame([analysis["price_analysis"]])
                    df_price.to_excel(writer, sheet_name='价格分析', index=False)
                    
                    # 类目分析
                    df_category = pd.DataFrame(list(analysis["category_analysis"]["category_distribution"].items()),
                                          columns=['类目', '数量'])
                    df_category.to_excel(writer, sheet_name='类目分析', index=False)
                    
                    # 销量分析
                    df_sales = pd.DataFrame(list(analysis["sales_analysis"]["sales_distribution"].items()),
                                          columns=['销量区间', '数量'])
                    df_sales.to_excel(writer, sheet_name='销量分析', index=False)
                    
                    # 材质分析
                    df_material = pd.DataFrame(list(analysis["attribute_analysis"]["material_distribution"].items()),
                                            columns=['材质', '数量'])
                    df_material.to_excel(writer, sheet_name='材质分析', index=False)
            
            logging.info(f"数据已导出至 {filename}")
        except Exception as e:
            logging.error(f"导出Excel失败: {e}")


# 示例调用
if __name__ == "__main__":
    # 替换为实际的参数(从VVIC开放平台获取)
    APP_KEY = "your_app_key"
    APP_SECRET = "your_app_secret"
    KEYWORD = "夏季连衣裙"  # 搜索关键词
    
    # 初始化API客户端
    vvic_search = VVICItemSearch(APP_KEY, APP_SECRET)
    # 若为高级权限,设置更高的频率限制
    # vvic_search.set_rate_limit(200)
    
    # 1. 设置搜索筛选条件
    filters = {
        "sort": "sales_desc",     # 按销量排序
        "price_from": 30,         # 最低价格
        "price_to": 200,          # 最高价格
        "style": "韩版",          # 风格筛选
        "has_stock": True,        # 只看有库存的商品
        "wholesale": True         # 只看支持批发的商品
    }
    
    # 2. 批量获取搜索结果
    print("=== 搜索商品 ===")
    items, meta_info = vvic_search.batch_search_items(
        keyword=KEYWORD,
        max_pages=3,       # 获取前3页
        page_size=50,
        filters=filters
    )
    
    if items:
        print(f"搜索关键词: {KEYWORD}")
        print(f"获取商品数量: {len(items)}")
        print(f"总商品数量: {meta_info['total_items']}")
        print(f"总页数: {meta_info['total_pages']}")
    
    # 3. 分析搜索结果
    print("\n=== 搜索结果分析 ===")
    if items:
        analysis = vvic_search.analyze_search_results(items, meta_info)
        
        print("价格分析:")
        print(f"  价格范围: {analysis['price_analysis']['min_price']} - {analysis['price_analysis']['max_price']} 元")
        print(f"  平均价格: {analysis['price_analysis']['avg_price']} 元")
        print(f"  中位数价格: {analysis['price_analysis']['median_price']} 元")
        
        print("\n类目分析:")
        print(f"  类目总数: {analysis['category_analysis']['total_categories']}")
        print("  主要类目:")
        for category, count in analysis["category_analysis"]["top_categories"][:5]:
            print(f"    {category}: {count}个商品")
        
        print("\n销售分析:")
        print(f"  总销量: {analysis['sales_analysis']['total_sales']}")
        print(f"  平均销量: {analysis['sales_analysis']['avg_sales']}")
        print(f"  零销量商品: {analysis['sales_analysis']['zero_sales_count']}")
        
        print("\n供应商分析:")
        print(f"  供应商总数: {analysis['seller_analysis']['total_sellers']}")
        print("  供应商类型:")
        for stype, count in analysis["seller_analysis"]["seller_type_distribution"].items():
            print(f"    {stype}: {count}个商品")
        
        print("\n起订量分析:")
        print(f"  最小起订量: {analysis['order_analysis']['min_order']}")
        print(f"  最大起订量: {analysis['order_analysis']['max_order']}")
        print(f"  平均起订量: {analysis['order_analysis']['avg_order']}")
        
        # 4. 获取各维度TOP商品
        print("\n=== 各维度TOP3商品 ===")
        top_sales = vvic_search.get_top_items(items, by="sales", top_n=3)
        print("销量TOP3:")
        for i, item_info in enumerate(top_sales, 1):
            item = item_info["item"]
            print(f"{i}. {item['title'][:30]}... 销量: {item_info['value']}")
        
        top_rating = vvic_search.get_top_items(items, by="rating", top_n=3)
        print("\n评分TOP3:")
        for i, item_info in enumerate(top_rating, 1):
            item = item_info["item"]
            print(f"{i}. {item['title'][:30]}... 评分: {item_info['value']}")
        
        top_cheap = vvic_search.get_top_items(items, by="price", top_n=3)
        print("\n价格最低TOP3:")
        for i, item_info in enumerate(top_cheap, 1):
            item = item_info["item"]
            print(f"{i}. {item['title'][:30]}... 价格: {item_info['value']} 元")
        
        # 5. 可视化分析结果
        vvic_search.visualize_analysis(analysis)
        
        # 6. 导出数据到Excel
        vvic_search.export_to_excel(items, analysis, meta_info, f"VVIC搜索_{KEYWORD.replace(' ', '_')}_分析.xlsx")
    else:
        print("未获取到搜索结果,无法进行分析")

四、接口调用注意事项

1. 常见错误及解决方案

错误码说明解决方案
1001无效的 app_key检查 app_key 是否正确,应用是否已通过审核
1002签名错误检查签名算法实现,确保参数排序正确、app_secret 无误
1003权限不足申请更高权限或检查是否已开通 item_search 接口权限
1004调用频率超限降低调用频率,或申请提高权限等级
2001关键词无效检查关键词长度是否在 2-30 个字符之间,是否包含非法字符
2002参数错误检查 page_size 是否在 10-50 之间,sort 是否为支持的类型
3001搜索服务暂时不可用稍后重试,可能是平台服务临时故障
4001访问被拒绝检查开发者账号状态,是否有违规行为导致限制

2. 性能优化建议

  • 精准筛选:使用价格区间、风格、材质等筛选条件减少返回数据量

  • 合理设置分页:使用最大 page_size(50)减少请求次数,降低频率限制影响

  • 缓存策略:对相同关键词和筛选条件的搜索结果设置 15-30 分钟缓存

  • 批量处理控制:多关键词搜索时,使用队列控制并发,避免触发频率限制

  • 关键词优化:使用精准关键词减少无关结果,提高数据质量

  • 增量搜索:定期搜索时,记录上次搜索的商品 ID,只关注新增或变化的商品

  • 异常处理:实现失败重试机制,特别是针对临时服务不可用的情况

五、应用场景与扩展建议

典型应用场景

  • 市场趋势分析:通过关键词搜索分析特定品类服装的市场状况和趋势

  • 采购决策支持:了解不同供应商的价格、起订量和商品质量,优化采购策略

  • 竞品监控:跟踪同类商品的价格、销量和供应商情况

  • 新品开发:分析热门商品特征,指导新产品开发和设计

  • 价格策略制定:了解市场价格区间和分布,优化自身定价策略

  • 供应商评估:通过搜索结果评估优质供应商,建立合作关系

扩展建议

  • 多关键词对比:同时分析多个相关关键词的搜索结果,全面了解市场

  • 趋势追踪:定期执行相同搜索,跟踪市场变化和趋势

  • 价格带分析:深入分析不同价格带商品的特征和市场表现

  • 季节性分析:结合时间维度,分析不同季节的商品特征变化

  • 关键词扩展:基于初始搜索结果,自动生成相关关键词并进行扩展搜索

  • 智能推荐:基于搜索分析结果,自动推荐有潜力的商品或供应商

  • 供应链分析:结合供应商信息,分析不同地区、类型供应商的特点和优势

通过VVICitem_search接口获取的商品列表数据,能够帮助服装电商从业者全面了解特定关键词相关的商品市场状况,为采购决策和市场分析提供数据支持。在实际应用中,需结合服装行业的季节性、潮流变化等特点,制定针对性的采购和销售策略,同时严格遵守 VVIC 平台的数据使用规范,确保合规使用 API 接口。


群贤毕至

访客