唯品会(Vipshop)开放平台提供了丰富的 API 接口,支持开发者进行商品查询、订单管理、促销活动等电商全流程操作。自定义 API 操作基于这些基础接口,通过组合调用和业务逻辑封装,实现符合特定场景需求的功能,广泛应用于供应链管理、数据分析、智能采购等领域。
一、唯品会 API 核心特性分析
1. 平台架构与认证机制
2. 核心 API 分类与功能
3. 自定义 API 操作模式
二、Python 实现方案
import requests
import time
import hashlib
import json
import logging
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from datetime import datetime, timedelta
from collections import defaultdict
from typing import Dict, List, Optional, Tuple
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
class VipshopCustomAPI:
"""唯品会自定义 API 操作类,封装基础 API 调用和业务逻辑"""
def __init__(self, app_key: str, app_secret: str, access_token: str = ""):
"""
初始化唯品会 API 客户端
:param app_key: 应用的 appKey
:param app_secret: 应用的 appSecret
:param access_token: 访问令牌
"""
self.app_key = app_key
self.app_secret = app_secret
self.access_token = access_token
self.gateway_url = "https://api.vip.com/v2/"
self.session = requests.Session()
self.session.headers.update({
"Content-Type": "application/json",
"User-Agent": "VipshopCustomAPI/1.0.0 (Python)"
})
# 频率控制
self.api_calls = defaultdict(int)
self.rate_limit = 100 # 每分钟最多调用次数
self.last_minute = time.time()
# Token 过期时间(默认 2 小时)
self.token_expiry = 0
def set_access_token(self, access_token: str, expires_in: int = 7200) -> None:
"""设置访问令牌及过期时间"""
self.access_token = access_token
self.token_expiry = time.time() + expires_in
def is_token_valid(self) -> bool:
"""检查令牌是否有效"""
return self.access_token and (time.time() < self.token_expiry - 60) # 提前 60 秒刷新
def get_access_token(self, grant_type: str = "client_credentials") -> Optional[str]:
"""
获取访问令牌
:param grant_type: 授权类型,client_credentials 或 authorization_code
:return: 访问令牌
"""
url = "https://oauth.vip.com/oauth/token"
params = {
"grant_type": grant_type,
"appKey": self.app_key,
"appSecret": self.app_secret
}
try:
response = self.session.post(url, params=params, timeout=10)
response.raise_for_status()
result = response.json()
if "access_token" in result:
self.set_access_token(
result["access_token"],
result.get("expires_in", 7200)
)
logging.info(f"成功获取 access_token,有效期 {result.get('expires_in', 7200)} 秒")
return result["access_token"]
else:
logging.error(f"获取 access_token 失败: {result.get('error_description')}")
return None
except Exception as e:
logging.error(f"获取 access_token 异常: {str(e)}")
return None
def _generate_sign(self, params: Dict, timestamp: str) -> str:
"""生成签名"""
# 排序参数
sorted_params = sorted(params.items(), key=lambda x: x[0])
# 拼接签名字符串
sign_str = f"{self.app_secret}{timestamp}"
for k, v in sorted_params:
sign_str += f"{k}{v}"
sign_str += self.app_secret
# 计算 SHA256
return hashlib.sha256(sign_str.encode('utf-8')).hexdigest().upper()
def _check_rate_limit(self) -> bool:
"""检查 API 调用频率限制"""
current_time = time.time()
# 每分钟重置计数
if current_time - self.last_minute > 60:
self.api_calls.clear()
self.last_minute = current_time
# 检查是否超过限制
if sum(self.api_calls.values()) >= self.rate_limit:
sleep_time = 60 - (current_time - self.last_minute)
logging.warning(f"API 调用频率超限,等待 {sleep_time:.1f} 秒")
time.sleep(sleep_time + 1)
self.api_calls.clear()
self.last_minute = current_time
return True
def call_api(self, api_name: str, params: Dict = None, method: str = "GET") -> Optional[Dict]:
"""
调用唯品会 API
:param api_name: API 名称,如 product.get
:param params: API 参数
:param method: 请求方法,GET 或 POST
:return: API 返回结果
"""
# 检查并刷新令牌
if not self.is_token_valid():
logging.info("access_token 已过期或无效,尝试重新获取")
if not self.get_access_token():
return None
# 检查频率限制
if not self._check_rate_limit():
return None
# 构建基础参数
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
base_params = {
"appKey": self.app_key,
"access_token": self.access_token,
"timestamp": timestamp
}
# 合并参数
if params:
base_params.update(params)
# 生成签名
sign = self._generate_sign(base_params, timestamp)
base_params["sign"] = sign
try:
# 构建完整 URL
url = f"{self.gateway_url}{api_name}"
# 发送请求
if method.upper() == "GET":
response = self.session.get(url, params=base_params, timeout=15)
else:
response = self.session.post(url, json=base_params, timeout=15)
response.raise_for_status()
# 解析响应
result = response.json()
# 处理错误
if result.get("code") != 0:
logging.error(f"API 调用错误: {result.get('message')} (错误码: {result.get('code')})")
return None
# 返回结果
return result.get("data", {})
except requests.exceptions.RequestException as e:
logging.error(f"API 请求异常: {str(e)}")
return None
except json.JSONDecodeError:
logging.error(f"API 响应解析失败: {response.text[:200]}...")
return None
finally:
# 记录 API 调用
self.api_calls[api_name] += 1
# ------------------------------
# 商品相关自定义操作
# ------------------------------
def search_products(self, keyword: str, page: int = 1, page_size: int = 30,
min_price: float = None, max_price: float = None,
category_id: int = None) -> Dict:
"""
搜索商品
:param keyword: 搜索关键词
:param page: 页码
:param page_size: 每页数量
:param min_price: 最低价格
:param max_price: 最高价格
:param category_id: 分类 ID
:return: 商品列表及分页信息
"""
params = {
"keyword": keyword,
"page": page,
"pageSize": page_size
}
if min_price is not None:
params["minPrice"] = min_price
if max_price is not None:
params["maxPrice"] = max_price
if category_id is not None:
params["categoryId"] = category_id
result = self.call_api("product/search", params)
if not result:
return {"total": 0, "products": []}
total = int(result.get("totalCount", 0))
products = result.get("products", [])
return {
"total": total,
"page": page,
"page_size": page_size,
"total_pages": (total + page_size - 1) // page_size,
"products": self._format_products(products)
}
def get_product_details(self, product_id: str) -> Optional[Dict]:
"""获取商品详情"""
params = {
"productId": product_id
}
result = self.call_api("product/get", params)
if not result:
return None
return self._format_product(result)
def batch_get_products(self, product_ids: List[str]) -> List[Dict]:
"""批量获取商品信息"""
if not product_ids:
return []
products = []
# 唯品会 API 一次最多支持 20 个 ID
batch_size = 20
for i in range(0, len(product_ids), batch_size):
batch_ids = product_ids[i:i+batch_size]
params = {
"productIds": ",".join(batch_ids)
}
result = self.call_api("product/batchGet", params)
if result and "products" in result:
formatted = [self._format_product(p) for p in result["products"]]
products.extend(formatted)
# 控制调用频率
time.sleep(1)
logging.info(f"成功获取 {len(products)} 件商品信息")
return products
def analyze_product_prices(self, keyword: str, max_pages: int = 3) -> Dict:
"""分析特定关键词商品的价格分布"""
all_products = []
page = 1
while page <= max_pages:
logging.info(f"搜索商品第 {page} 页: {keyword}")
result = self.search_products(keyword, page)
if not result["products"]:
break
all_products.extend(result["products"])
if page >= result["total_pages"]:
break
page += 1
time.sleep(1)
# 提取价格信息
prices = [float(p["currentPrice"]) for p in all_products]
# 价格统计
price_stats = {}
if prices:
price_stats = {
"min": min(prices),
"max": max(prices),
"avg": sum(prices) / len(prices),
"median": self._calculate_median(prices)
}
# 折扣分析
discount_stats = {}
original_prices = [
float(p["originalPrice"]) for p in all_products
if p["originalPrice"] and float(p["originalPrice"]) > 0
]
if original_prices and prices:
discounts = [(1 - prices[i] / original_prices[i]) * 100
for i in range(min(len(prices), len(original_prices)))]
discount_stats = {
"avg_discount": sum(discounts) / len(discounts),
"max_discount": max(discounts) if discounts else 0
}
# 分类分析
category_counts = defaultdict(int)
for product in all_products:
category = product.get("categoryName", "未知分类")
category_counts[category] += 1
return {
"total_products": len(all_products),
"price_stats": price_stats,
"discount_stats": discount_stats,
"top_categories": sorted(category_counts.items(), key=lambda x: x[1], reverse=True)[:5],
"products": all_products
}
# ------------------------------
# 订单相关自定义操作
# ------------------------------
def get_recent_orders(self, days: int = 7, order_status: int = -1) -> List[Dict]:
"""
获取最近 N 天的订单
:param days: 天数
:param order_status: 订单状态,-1-全部,0-待付款,1-已付款等
:return: 订单列表
"""
end_time = datetime.now()
start_time = end_time - timedelta(days=days)
params = {
"startTime": start_time.strftime("%Y-%m-%d %H:%M:%S"),
"endTime": end_time.strftime("%Y-%m-%d %H:%M:%S"),
"page": 1,
"pageSize": 50,
"orderStatus": order_status
}
all_orders = []
while True:
logging.info(f"获取订单第 {params['page']} 页")
result = self.call_api("order/list", params, method="POST")
if not result or "orders" not in result:
break
orders = result["orders"]
all_orders.extend(orders)
# 检查是否还有更多页
total_count = int(result.get("totalCount", 0))
if len(all_orders) >= total_count:
break
params["page"] += 1
time.sleep(1)
logging.info(f"成功获取 {len(all_orders)} 条订单")
return self._format_orders(all_orders)
def analyze_order_data(self, orders: List[Dict]) -> Dict:
"""分析订单数据"""
if not orders:
return {}
# 订单状态分布
state_counts = defaultdict(int)
# 每日订单统计
daily_stats = defaultdict(lambda: {"count": 0, "amount": 0})
# 商品销售统计
product_stats = defaultdict(lambda: {"count": 0, "amount": 0})
for order in orders:
# 订单状态统计
state_counts[order["orderStatusDesc"]] += 1
# 每日统计
if order["createTime"]:
date_str = order["createTime"].split()[0]
daily_stats[date_str]["count"] += 1
daily_stats[date_str]["amount"] += float(order["orderAmount"])
# 商品销售统计
for item in order.get("items", []):
product_id = item["productId"]
product_stats[product_id]["count"] += item["quantity"]
product_stats[product_id]["amount"] += float(item["price"]) * item["quantity"]
# 计算总销售额和订单数
total_orders = len(orders)
total_sales = sum(float(order["orderAmount"]) for order in orders)
# 转换为排序后的列表
sorted_dates = sorted(daily_stats.keys())
daily_sales = [daily_stats[date]["amount"] for date in sorted_dates]
daily_counts = [daily_stats[date]["count"] for date in sorted_dates]
# 热销商品
top_products = sorted(product_stats.items(),
key=lambda x: x[1]["amount"],
reverse=True)[:10]
return {
"total_orders": total_orders,
"total_sales": total_sales,
"avg_order_value": total_sales / total_orders if total_orders else 0,
"state_distribution": dict(state_counts),
"daily_trend": {
"dates": sorted_dates,
"sales": daily_sales,
"orders": daily_counts
},
"top_products": top_products
}
# ------------------------------
# 库存相关自定义操作
# ------------------------------
def get_product_inventory(self, product_id: str) -> Optional[Dict]:
"""获取商品库存"""
params = {
"productId": product_id
}
result = self.call_api("inventory/get", params)
if not result:
return None
return {
"product_id": product_id,
"total_stock": result.get("totalStock", 0),
"locked_stock": result.get("lockedStock", 0), # 锁定库存
"available_stock": result.get("availableStock", 0), # 可用库存
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
def check_low_stock(self, product_ids: List[str], threshold: int = 20) -> List[Dict]:
"""检查低库存商品"""
low_stock_items = []
for product_id in product_ids:
inventory = self.get_product_inventory(product_id)
if not inventory:
continue
if inventory["available_stock"] <= threshold:
# 获取商品基本信息
product = self.get_product_details(product_id)
if product:
inventory["product_name"] = product["productName"]
inventory["current_price"] = product["currentPrice"]
low_stock_items.append(inventory)
time.sleep(0.5)
logging.info(f"发现 {len(low_stock_items)} 个低库存商品")
return low_stock_items
# ------------------------------
# 工具方法
# ------------------------------
def _format_products(self, products: List[Dict]) -> List[Dict]:
"""格式化商品列表数据"""
return [self._format_product(p) for p in products]
def _format_product(self, product: Dict) -> Dict:
"""格式化单个商品数据"""
return {
"productId": product.get("productId"),
"productName": product.get("productName", ""),
"currentPrice": product.get("currentPrice", 0),
"originalPrice": product.get("originalPrice", 0),
"discount": product.get("discount", 0),
"categoryId": product.get("categoryId", ""),
"categoryName": product.get("categoryName", ""),
"brandId": product.get("brandId", ""),
"brandName": product.get("brandName", ""),
"salesCount": product.get("salesCount", 0),
"commentCount": product.get("commentCount", 0),
"score": product.get("score", 0),
"mainImage": product.get("mainImage", ""),
"detailUrl": product.get("detailUrl", "")
}
def _format_orders(self, orders: List[Dict]) -> List[Dict]:
"""格式化订单数据"""
formatted = []
for order in orders:
# 处理订单项
items = []
if "items" in order:
for item in order["items"]:
items.append({
"productId": item.get("productId"),
"productName": item.get("productName"),
"quantity": item.get("quantity", 1),
"price": item.get("price", 0),
"totalPrice": item.get("totalPrice", 0)
})
formatted.append({
"orderId": order.get("orderId"),
"orderAmount": order.get("orderAmount", 0),
"orderStatus": order.get("orderStatus", 0),
"orderStatusDesc": order.get("orderStatusDesc", ""),
"payTime": order.get("payTime", ""),
"shipTime": order.get("shipTime", ""),
"receiveTime": order.get("receiveTime", ""),
"createTime": order.get("createTime", ""),
"buyerName": order.get("buyerName", ""),
"items": items
})
return formatted
def _calculate_median(self, data: List[float]) -> float:
"""计算中位数"""
sorted_data = sorted(data)
n = len(sorted_data)
if n % 2 == 1:
return round(sorted_data[n//2], 2)
else:
return round((sorted_data[n//2 - 1] + sorted_data[n//2]) / 2, 2)
def visualize_analysis(self, analysis: Dict, title: str, output_file: str) -> None:
"""可视化分析结果"""
plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"]
plt.rcParams["axes.unicode_minus"] = False
fig, ax = plt.subplots(figsize=(10, 6))
if "price_stats" in analysis:
# 价格分布直方图
prices = [float(p["currentPrice"]) for p in analysis["products"]]
ax.hist(prices, bins=10, alpha=0.7, color='skyblue')
ax.axvline(analysis["price_stats"]["avg"], color='r', linestyle='dashed', linewidth=1,
label=f'平均价格: {analysis["price_stats"]["avg"]:.2f}')
ax.set_title(f"{title} - 商品价格分布")
ax.set_xlabel("价格 (元)")
ax.set_ylabel("商品数量")
ax.legend()
elif "daily_trend" in analysis:
# 销售趋势图
x = np.arange(len(analysis["daily_trend"]["dates"]))
width = 0.35
ax.bar(x - width/2, analysis["daily_trend"]["orders"], width, label='订单数')
ax_twin = ax.twinx()
ax_twin.plot(x, analysis["daily_trend"]["sales"], 'r-', marker='o', label='销售额')
ax.set_title(f"{title} - 销售趋势")
ax.set_xlabel("日期")
ax.set_ylabel("订单数")
ax_twin.set_ylabel("销售额 (元)")
ax.set_xticks(x)
ax.set_xticklabels(analysis["daily_trend"]["dates"], rotation=45)
# 合并图例
lines, labels = ax.get_legend_handles_labels()
lines2, labels2 = ax_twin.get_legend_handles_labels()
ax.legend(lines + lines2, labels + labels2, loc='upper left')
elif "state_distribution" in analysis:
# 订单状态饼图
labels = list(analysis["state_distribution"].keys())
sizes = list(analysis["state_distribution"].values())
ax.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=90)
ax.set_title(f"{title} - 订单状态分布")
ax.axis('equal')
plt.tight_layout()
plt.savefig(output_file)
plt.close()
logging.info(f"分析图表已保存至 {output_file}")
def export_to_excel(self, data: List[Dict], filename: str, sheet_name: str = "数据") -> None:
"""将数据导出到 Excel"""
if not data:
logging.warning("没有可导出的数据")
return
df = pd.DataFrame(data)
df.to_excel(filename, sheet_name=sheet_name, index=False)
logging.info(f"数据已导出至 {filename}")
# 示例调用
if __name__ == "__main__":
# 替换为实际的参数(从唯品会开放平台获取)
APP_KEY = "your_app_key"
APP_SECRET = "your_app_secret"
# 初始化 API 客户端
api = VipshopCustomAPI(APP_KEY, APP_SECRET)
# 获取访问令牌
if not api.get_access_token():
logging.error("无法获取 access_token,程序退出")
exit(1)
# 示例 1: 商品价格分析
print("=== 商品价格分析 ===")
KEYWORD = "女装 连衣裙" # 搜索关键词
price_analysis = api.analyze_product_prices(KEYWORD, max_pages=3)
print(f"搜索关键词: {KEYWORD}")
print(f"获取商品总数: {price_analysis['total_products']}")
if price_analysis["price_stats"]:
print(f"价格范围: {price_analysis['price_stats']['min']}-{price_analysis['price_stats']['max']} 元")
print(f"平均价格: {price_analysis['price_stats']['avg']:.2f} 元")
if price_analysis["discount_stats"]:
print(f"平均折扣: {price_analysis['discount_stats']['avg_discount']:.1f}%")
print(f"最大折扣: {price_analysis['discount_stats']['max_discount']:.1f}%")
print("热门分类 TOP3:")
for i, (category, count) in enumerate(price_analysis["top_categories"][:3], 1):
print(f" {i}. {category}: {count} 款商品")
api.visualize_analysis(price_analysis, f"商品价格分析: {KEYWORD}", "product_price_analysis.png")
# 示例 2: 订单数据分析
print("\n=== 订单数据分析 ===")
recent_orders = api.get_recent_orders(days=15) # 获取最近 15 天订单
order_analysis = api.analyze_order_data(recent_orders)
print(f"总订单数: {order_analysis['total_orders']}")
print(f"总销售额: {order_analysis['total_sales']:.2f} 元")
print(f"平均订单金额: {order_analysis['avg_order_value']:.2f} 元")
print("订单状态分布:")
for state, count in order_analysis["state_distribution"].items():
print(f" {state}: {count} 单 ({count/order_analysis['total_orders']*100:.1f}%)")
api.visualize_analysis(order_analysis, "最近15天订单分析", "order_analysis.png")
api.export_to_excel(recent_orders, "recent_orders.xlsx", "订单数据")
# 示例 3: 库存检查
print("\n=== 库存检查 ===")
if price_analysis["products"]:
# 选取前 10 个商品检查库存
product_ids = [p["productId"] for p in price_analysis["products"][:10]]
low_stock = api.check_low_stock(product_ids, threshold=20)
if low_stock:
print(f"发现 {len(low_stock)} 个低库存商品:")
for item in low_stock:
print(f" {item['product_name'][:20]}... (ID: {item['product_id']})")
print(f" 可用库存: {item['available_stock']}, 价格: {item['current_price']} 元")
else:
print("未发现低库存商品")