VVIC(搜款网)作为国内领先的服装服饰批发市场供应链平台,其开放平台的 seller_search 接口专注于提供批发市场的商家及商品排行榜数据,涵盖热销商品排行、商家销量排名、类目趋势等核心信息。该接口广泛应用于服装行业的货源选品、竞品分析、市场趋势预测等场景,为供应商、零售商提供数据支持。
一、seller_search 接口核心特性分析
1. 接口定位与核心价值
2. 接口权限与调用限制
3. 核心参数解析
(1)必选参数
(2)可选筛选参数
二、签名生成逻辑与返回数据结构
1. 签名生成逻辑
2. 返回数据结构解析
(1)基础信息(外层字段)
(2)商品排行榜数据(data 字段,rank_type=product_sale 时)
(3)商家排行榜数据(data 字段,rank_type=seller_sale 时)
三、Python 实现方案
import requests
import time
import hashlib
import json
import logging
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from datetime import datetime, timedelta
from collections import defaultdict
from typing import Dict, List, Optional, Tuple
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
# 配置中文显示
plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"]
plt.rcParams["axes.unicode_minus"] = False
class VVICSellerSearch:
"""VVIC seller_search 接口封装类,用于获取和分析排行榜数据"""
def __init__(self, app_key: str, app_secret: str):
"""
初始化 VVIC API 客户端
:param app_key: 应用的 app_key(从开放平台获取)
:param app_secret: 应用的 app_secret(从开放平台获取)
"""
self.app_key = app_key
self.app_secret = app_secret
self.gateway_url = "https://api.vvic.com/v2/rank/seller_search"
self.session = requests.Session()
self.session.headers.update({
"Content-Type": "application/x-www-form-urlencoded;charset=utf-8",
"User-Agent": "VVICSellerSearch/1.0.0 (Python)"
})
# 频率控制(个人开发者10次/分钟,企业开发者30次/分钟)
self.rate_limit = 10 # 默认个人开发者限制,企业可手动修改为30
self.call_timestamps = [] # 存储调用时间戳(秒级)
def set_rate_limit(self, limit: int) -> None:
"""设置调用频率限制(次/分钟)"""
if 10 <= limit <= 30:
self.rate_limit = limit
logging.info(f"已设置调用频率限制为 {limit} 次/分钟")
else:
logging.warning("频率限制必须在10-30之间,未修改")
def _generate_sign(self, params: Dict) -> str:
"""
生成签名(MD5算法)
:param params: 请求参数字典
:return: 签名字符串(大写)
"""
# 1. 按参数名ASCII升序排序
sorted_params = sorted(params.items(), key=lambda x: x[0])
# 2. 拼接为"key=value&key=value"格式
param_str = "&".join([f"{k}={v}" for k, v in sorted_params])
# 3. 追加app_secret
sign_str = f"{param_str}&app_secret={self.app_secret}"
# 4. MD5加密并转为大写
return hashlib.md5(sign_str.encode("utf-8")).hexdigest().upper()
def _check_rate_limit(self) -> None:
"""检查并控制调用频率"""
current_time = time.time()
# 保留1分钟内的调用记录
self.call_timestamps = [t for t in self.call_timestamps if current_time - t < 60]
# 若超过限制,计算需要等待的时间
if len(self.call_timestamps) >= self.rate_limit:
oldest_time = self.call_timestamps[0]
sleep_time = 60 - (current_time - oldest_time) + 1 # 额外加1秒保险
logging.warning(f"调用频率超限,等待 {sleep_time:.1f} 秒")
time.sleep(sleep_time)
# 再次清理过期记录
self.call_timestamps = [t for t in self.call_timestamps if time.time() - t < 60]
# 记录本次调用时间
self.call_timestamps.append(current_time)
def call_seller_search(self, rank_type: str, period: str, page: int = 1,
page_size: int = 50, filters: Optional[Dict] = None) -> Optional[Dict]:
"""
调用 seller_search 接口
:param rank_type: 排行榜类型(product_sale/seller_sale)
:param period: 统计周期(day/week/month)
:param page: 页码
:param page_size: 每页数量
:param filters: 筛选参数(如category_id、city等)
:return: 接口返回结果,失败返回None
"""
# 1. 构建基础参数
base_params = {
"app_key": self.app_key,
"timestamp": int(time.time()), # 秒级时间戳
"rank_type": rank_type,
"period": period,
"page": page,
"page_size": page_size
}
# 2. 合并筛选参数
if filters and isinstance(filters, Dict):
# 过滤空值参数
valid_filters = {k: v for k, v in filters.items() if v is not None}
base_params.update(valid_filters)
# 3. 生成签名
base_params["sign"] = self._generate_sign(base_params)
# 4. 频率控制
self._check_rate_limit()
try:
# 5. 发送请求
response = self.session.get(self.gateway_url, params=base_params, timeout=15)
response.raise_for_status() # 抛出HTTP错误
# 6. 解析响应
result = response.json()
# 7. 处理状态码
if result.get("code") != 0:
logging.error(f"接口调用失败:code={result.get('code')}, message={result.get('message')}")
return None
logging.info(f"成功获取 {rank_type}({period})第 {page} 页数据,共 {result.get('total', 0)} 条")
return result
except requests.exceptions.RequestException as e:
logging.error(f"请求异常:{str(e)}")
return None
except json.JSONDecodeError:
logging.error(f"响应解析失败:{response.text[:200]}...")
return None
def batch_get_rank_data(self, rank_type: str, period: str, max_pages: int = 5,
page_size: int = 50, filters: Optional[Dict] = None) -> Tuple[List[Dict], Dict]:
"""
批量获取多页排行榜数据
:param rank_type: 排行榜类型
:param period: 统计周期
:param max_pages: 最大获取页数(≤30)
:param page_size: 每页数量
:param filters: 筛选参数
:return: 排行榜数据列表和元信息(总条数、周期等)
"""
all_data = []
meta_info = {"total": 0, "rank_type": rank_type, "period": period, "filters": filters}
max_pages = min(max_pages, 30) # 限制最大页数为30
for page in range(1, max_pages + 1):
logging.info(f"正在获取第 {page}/{max_pages} 页数据...")
result = self.call_seller_search(rank_type, period, page, page_size, filters)
if not result or "data" not in result:
logging.warning(f"第 {page} 页无数据,停止批量获取")
break
# 提取数据并添加页码信息
page_data = result["data"]
for item in page_data:
item["page"] = page
item["crawl_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 抓取时间
all_data.extend(page_data)
# 保存元信息(第一页获取一次即可)
if page == 1:
meta_info["total"] = result.get("total", 0)
meta_info["update_time"] = result.get("update_time", "") # 数据更新时间
# 若当前页数据不足page_size,说明已无更多数据
if len(page_data) < page_size:
logging.info(f"第 {page} 页数据不足 {page_size} 条,停止批量获取")
break
logging.info(f"批量获取完成,共获取 {len(all_data)} 条 {rank_type}({period})数据")
return all_data, meta_info
def analyze_product_rank(self, product_data: List[Dict]) -> Dict:
"""分析商品排行榜数据"""
if not product_data:
return {"error": "无商品数据可分析"}
# 1. 价格分布分析
prices = [item.get("price", 0.0) for item in product_data if item.get("price") > 0]
price_stats = {}
if prices:
price_stats = {
"min": round(min(prices), 2),
"max": round(max(prices), 2),
"avg": round(sum(prices) / len(prices), 2),
"median": round(np.median(prices), 2)
}
# 2. 产地分布
city_counts = defaultdict(int)
for item in product_data:
city = item.get("city", "未知")
city_counts[city] += 1
# 3. 销量与增长率分析
sales = [item.get("sales", 0) for item in product_data]
growth_rates = [item.get("growth_rate", 0.0) for item in product_data if item.get("growth_rate") is not None]
# 4. 排名变化分析(上升/下降/不变)
rank_changes = defaultdict(int)
for item in product_data:
if "last_rank" in item and "rank" in item:
last_rank = item["last_rank"]
current_rank = item["rank"]
if last_rank > current_rank:
rank_changes["上升"] += 1
elif last_rank < current_rank:
rank_changes["下降"] += 1
else:
rank_changes["不变"] += 1
# 5. 热门标签分析
tag_counts = defaultdict(int)
for item in product_data:
for tag in item.get("tags", []):
tag_counts[tag] += 1
return {
"total_products": len(product_data),
"price_analysis": price_stats,
"city_distribution": dict(city_counts),
"sales_stats": {
"total_sales": sum(sales),
"avg_sales": round(sum(sales)/len(sales), 1) if sales else 0,
"top3_sales": sorted(sales, reverse=True)[:3]
},
"growth_analysis": {
"avg_growth": round(sum(growth_rates)/len(growth_rates), 1) if growth_rates else 0,
"high_growth_count": sum(1 for r in growth_rates if r >= 50) # 高增长率(≥50%)商品数
},
"rank_change": dict(rank_changes),
"top_tags": sorted(tag_counts.items(), key=lambda x: x[1], reverse=True)[:10]
}
def analyze_seller_rank(self, seller_data: List[Dict]) -> Dict:
"""分析商家排行榜数据"""
if not seller_data:
return {"error": "无商家数据可分析"}
# 1. 销售额分析
sales = [item.get("sales", 0.0) for item in seller_data if item.get("sales") > 0]
sales_stats = {}
if sales:
sales_stats = {
"total": round(sum(sales), 2),
"avg": round(sum(sales)/len(sales), 2),
"top3": sorted(sales, reverse=True)[:3]
}
# 2. 商家所在地分布
city_counts = defaultdict(int)
for item in seller_data:
city = item.get("city", "未知")
city_counts[city] += 1
# 3. 主营类目分布
category_counts = defaultdict(int)
for item in seller_data:
category = item.get("main_category", "未知")
category_counts[category] += 1
# 4. 服务质量分析(回复率、发货时间)
reply_rates = [item.get("reply_rate", 0.0) for item in seller_data]
ship_times = [item.get("ship_time", 0.0) for item in seller_data if item.get("ship_time") > 0]
return {
"total_sellers": len(seller_data),
"sales_analysis": sales_stats,
"city_distribution": dict(city_counts),
"category_distribution": dict(category_counts),
"service_analysis": {
"avg_reply_rate": round(sum(reply_rates)/len(reply_rates), 1) if reply_rates else 0,
"avg_ship_time": round(sum(ship_times)/len(ship_times), 1) if ship_times else 0
},
"product_count_stats": {
"avg_product_count": round(sum(item.get("product_count", 0) for item in seller_data)/len(seller_data), 1)
}
}
def visualize_analysis(self, analysis: Dict, title: str, output_dir: str = ".") -> None:
"""可视化分析结果"""
# 1. 价格分布直方图(商品分析专用)
if "price_analysis" in analysis and analysis["price_analysis"]:
plt.figure(figsize=(10, 6))
prices = [item.get("price", 0.0) for item in analysis.get("raw_data", []) if item.get("price") > 0]
plt.hist(prices, bins=10, alpha=0.7, color='lightgreen')
plt.axvline(analysis["price_analysis"]["avg"], color='red', linestyle='--',
label=f'平均价格: {analysis["price_analysis"]["avg"]}元')
plt.title(f'{title} - 商品价格分布')
plt.xlabel('价格(元)')
plt.ylabel('商品数量')
plt.legend()
plt.tight_layout()
plt.savefig(f"{output_dir}/price_distribution.png")
plt.close()
logging.info(f"价格分布图表已保存至 {output_dir}/price_distribution.png")
# 2. 产地分布饼图
if "city_distribution" in analysis and analysis["city_distribution"]:
plt.figure(figsize=(8, 8))
# 只显示占比前5的产地,其余归为"其他"
city_data = sorted(analysis["city_distribution"].items(), key=lambda x: x[1], reverse=True)
if len(city_data) > 5:
top5 = city_data[:5]
others = sum(count for _, count in city_data[5:])
top5.append(("其他", others))
city_data = top5
labels, sizes = zip(*city_data)
plt.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=90)
plt.title(f'{title} - 产地分布')
plt.axis('equal')
plt.tight_layout()
plt.savefig(f"{output_dir}/city_distribution.png")
plt.close()
logging.info(f"产地分布图表已保存至 {output_dir}/city_distribution.png")
# 3. 销量TOP10商品条形图(商品分析专用)
if "sales_stats" in analysis and "raw_data" in analysis:
# 提取销量TOP10的商品名称和销量
top10_sales = sorted(
analysis["raw_data"],
key=lambda x: x.get("sales", 0),
reverse=True
)[:10]
plt.figure(figsize=(12, 6))
names = [item["title"][:10] + "..." for item in top10_sales] # 截断长标题
sales = [item.get("sales", 0) for item in top10_sales]
plt.barh(names, sales, color='skyblue')
plt.xlabel('销量(件)')
plt.title(f'{title} - 销量TOP10商品')
plt.gca().invert_yaxis() # 反转Y轴,让第一名在最上方
plt.tight_layout()
plt.savefig(f"{output_dir}/top10_sales.png")
plt.close()
logging.info(f"销量TOP10图表已保存至 {output_dir}/top10_sales.png")
def export_to_excel(self, data: List[Dict], meta_info: Dict, filename: str) -> None:
"""导出数据到Excel"""
if not data:
logging.warning("无数据可导出")
return
try:
with pd.ExcelWriter(filename) as writer:
# 数据详情页
df_data = pd.DataFrame(data)
df_data.to_excel(writer, sheet_name='数据详情', index=False)
# 元信息页
df_meta = pd.DataFrame(list(meta_info.items()), columns=['参数', '值'])
df_meta.to_excel(writer, sheet_name='元信息', index=False)
logging.info(f"数据已导出至 {filename}")
except Exception as e:
logging.error(f"导出Excel失败: {str(e)}")
# 示例调用
if __name__ == "__main__":
# 替换为实际参数(从VVIC开放平台获取)
APP_KEY = "your_app_key"
APP_SECRET = "your_app_secret"
# 初始化客户端
vvic_api = VVICSellerSearch(APP_KEY, APP_SECRET)
# 若为企业开发者,设置更高的频率限制
# vvic_api.set_rate_limit(30)
# 示例1:获取女装商品日销量排行榜
print("=== 女装商品日销量排行榜分析 ===")
product_filters = {
"category_id": 1001, # 1001=女装
"city": "广州", # 筛选广州产地
"price_min": 50, # 最低价格50元
"price_max": 200 # 最高价格200元
}
# 获取前3页数据(每页50条)
product_data, product_meta = vvic_api.batch_get_rank_data(
rank_type="product_sale",
period="day",
max_pages=3,
page_size=50,
filters=product_filters
)
# 分析商品数据
if product_data:
# 保存原始数据用于可视化
product_analysis = vvic_api.analyze_product_rank(product_data)
product_analysis["raw_data"] = product_data # 临时存储原始数据用于绘图
print(f"分析商品总数: {product_analysis['total_products']}")
print(f"价格范围: {product_analysis['price_analysis']['min']}-{product_analysis['price_analysis']['max']}元")
print(f"平均价格: {product_analysis['price_analysis']['avg']}元")
print(f"总销量: {product_analysis['sales_stats']['total_sales']}件")
print(f"平均增长率: {product_analysis['growth_analysis']['avg_growth']}%")
print("主要产地:")
for city, count in sorted(product_analysis["city_distribution"].items(), key=lambda x: x[1], reverse=True)[:3]:
print(f" {city}: {count}件商品")
# 可视化分析结果
vvic_api.visualize_analysis(
product_analysis,
title="广州女装日销排行榜分析",
output_dir="."
)
# 导出数据到Excel
vvic_api.export_to_excel(product_data, product_meta, "女装商品排行榜数据.xlsx")
# 示例2:获取男装商家周销量排行榜
print("\n=== 男装商家周销量排行榜分析 ===")
seller_filters = {
"category_id": 1002, # 1002=男装
"is_factory": 1 # 仅工厂直供商家
}
# 获取前2页数据
seller_data, seller_meta = vvic_api.batch_get_rank_data(
rank_type="seller_sale",
period="week",
max_pages=2,
page_size=50,
filters=seller_filters
)
# 分析商家数据
if seller_data:
seller_analysis = vvic_api.analyze_seller_rank(seller_data)
print(f"分析商家总数: {seller_analysis['total_sellers']}")
print(f"总销售额: {seller_analysis['sales_analysis']['total']}元")
print(f"平均在售商品数: {seller_analysis['product_count_stats']['avg_product_count']}")
print(f"平均回复率: {seller_analysis['service_analysis']['avg_reply_rate']}%")
print(f"平均发货时间: {seller_analysis['service_analysis']['avg_ship_time']}小时")
# 导出数据到Excel
vvic_api.export_to_excel(seller_data, seller_meta, "男装商家排行榜数据.xlsx")