在 B2B 电商领域,直接对接工厂资源是优化供应链、降低采购成本的关键环节。item_search_factory接口作为专注于工厂信息检索的工具,能够通过关键字精准定位符合需求的生产厂家,提供包括工厂资质、生产能力、主营产品、地理位置等核心数据。该接口广泛应用于货源开发、供应链审计、产业集群分析等场景,为企业采购决策提供数据支持。
一、item_search_factory 接口核心特性分析
1. 接口定位与核心价值
2. 接口权限与调用限制
3. 核心参数解析
必选参数
可选参数
二、签名生成与返回数据结构
1. 签名生成逻辑
2. 返回数据结构解析
三、Python 实现方案
import requests
import time
import hmac
import hashlib
import base64
import json
import logging
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from datetime import datetime
from collections import defaultdict
from typing import Dict, List, Optional, Tuple
import re
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
# 配置中文显示
plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"]
plt.rcParams["axes.unicode_minus"] = False
class FactorySearch:
"""item_search_factory接口封装类,用于搜索和分析工厂数据"""
def __init__(self, app_key: str, app_secret: str):
"""
初始化工厂搜索API客户端
:param app_key: 应用的app_key
:param app_secret: 应用的app_secret
"""
self.app_key = app_key
self.app_secret = app_secret
self.api_url = "https://api.industrialplatform.com/factory/search"
# 频率控制
self.rate_limit = 50 # 默认标准权限,高级权限可修改为200
self.call_timestamps = [] # 存储调用时间戳(毫秒级)
def set_rate_limit(self, limit: int) -> None:
"""设置调用频率限制(次/分钟)"""
if 50 <= limit <= 200:
self.rate_limit = limit
logging.info(f"已设置调用频率限制为 {limit} 次/分钟")
else:
logging.warning("频率限制必须在50-200之间,未修改")
def _generate_sign(self, params: Dict) -> str:
"""生成签名(HMAC-SHA1算法)"""
# 1. 按参数名ASCII升序排序
sorted_params = sorted(params.items(), key=lambda x: x[0])
# 2. 拼接为"key=value&key=value"格式
param_str = "&".join([f"{k}={v}" for k, v in sorted_params])
# 3. HMAC-SHA1加密
hmac_code = hmac.new(
self.app_secret.encode('utf-8'),
param_str.encode('utf-8'),
hashlib.sha1
).digest()
# 4. Base64编码
return base64.b64encode(hmac_code).decode('utf-8')
def _check_rate_limit(self) -> None:
"""检查并控制调用频率"""
current_time = time.time() * 1000 # 转为毫秒
# 保留1分钟内的调用记录
self.call_timestamps = [t for t in self.call_timestamps if current_time - t < 60000]
# 若超过限制,计算需要等待的时间
if len(self.call_timestamps) >= self.rate_limit:
oldest_time = self.call_timestamps[0]
sleep_time = (60000 - (current_time - oldest_time)) / 1000 + 0.1 # 额外加0.1秒保险
logging.warning(f"调用频率超限,等待 {sleep_time:.1f} 秒")
time.sleep(sleep_time)
# 再次清理过期记录
self.call_timestamps = [t for t in self.call_timestamps if time.time()*1000 - t < 60000]
# 记录本次调用时间
self.call_timestamps.append(current_time)
def search_factories(self, keywords: str, page: int = 1, page_size: int = 30,
filters: Optional[Dict] = None) -> Optional[Dict]:
"""
搜索工厂数据
:param keywords: 搜索关键词
:param page: 页码
:param page_size: 每页数量
:param filters: 筛选参数
:return: 工厂搜索结果
"""
# 构建基础参数
base_params = {
"app_key": self.app_key,
"timestamp": str(int(time.time() * 1000)), # 毫秒级时间戳
"keywords": keywords,
"page": page,
"page_size": page_size
}
# 合并筛选参数
if filters and isinstance(filters, Dict):
# 过滤空值参数
valid_filters = {k: v for k, v in filters.items() if v is not None}
base_params.update(valid_filters)
# 生成签名
sign = self._generate_sign(base_params)
base_params["sign"] = sign
# 检查频率限制
self._check_rate_limit()
try:
# 发送请求
response = requests.get(self.api_url, params=base_params, timeout=15)
response.raise_for_status()
# 解析响应
result = response.json()
# 处理错误
if result.get("code") != 0:
logging.error(f"API调用错误: {result.get('message')} (错误码: {result.get('code')})")
return None
# 提取结果
data = result.get("data", {})
if not data.get("factories"):
logging.warning("未获取到工厂数据")
return None
logging.info(f"成功获取第 {page} 页工厂数据,关键词: {keywords}")
return data
except requests.exceptions.RequestException as e:
logging.error(f"请求异常: {str(e)}")
return None
except json.JSONDecodeError:
logging.error(f"响应解析失败: {response.text[:200]}...")
return None
def batch_search_factories(self, keywords: str, max_pages: int = 5, page_size: int = 30,
filters: Optional[Dict] = None) -> Tuple[List[Dict], Dict]:
"""
批量搜索多页工厂数据
:param keywords: 搜索关键词
:param max_pages: 最大页数
:param page_size: 每页数量
:param filters: 筛选参数
:return: 工厂列表和搜索元信息
"""
all_factories = []
meta_info = {
"keywords": keywords,
"total_factories": 0,
"total_pages": 0,
"filters": filters,
"crawl_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"data_update_time": ""
}
page = 1
while page <= max_pages:
logging.info(f"正在搜索第 {page}/{max_pages} 页工厂...")
result = self.search_factories(keywords, page, page_size, filters)
if not result:
break
# 提取工厂数据
factories = result.get("factories", [])
if not factories:
logging.info("当前页无工厂数据,停止搜索")
break
all_factories.extend(factories)
# 保存元信息(第一页)
if page == 1:
meta_info["total_factories"] = result.get("total", 0)
meta_info["total_pages"] = min(max_pages, (meta_info["total_factories"] + page_size - 1) // page_size)
meta_info["data_update_time"] = result.get("update_time", "")
page += 1
logging.info(f"批量搜索完成,共获取 {len(all_factories)} 家工厂数据")
return all_factories, meta_info
def analyze_factories(self, factories: List[Dict]) -> Dict:
"""分析工厂数据"""
if not factories:
return {"error": "没有工厂数据可分析"}
# 1. 地区分布分析
province_counts = defaultdict(int)
city_counts = defaultdict(int)
for factory in factories:
province = factory.get("province", "未知省份")
city = factory.get("city", "未知城市")
province_counts[province] += 1
city_counts[f"{province}-{city}"] += 1
# 2. 成立年限分析
year_counts = defaultdict(int)
for factory in factories:
try:
establish_year = factory.get("establish_year", 0)
if establish_year > 0:
age = datetime.now().year - establish_year
if age <= 5:
year_counts["5年以内"] += 1
elif age <= 10:
year_counts["6-10年"] += 1
elif age <= 20:
year_counts["11-20年"] += 1
else:
year_counts["20年以上"] += 1
else:
year_counts["未知"] += 1
except (TypeError, ValueError):
year_counts["未知"] += 1
# 3. 规模分析(员工数量)
employee_counts = defaultdict(int)
for factory in factories:
try:
employees = factory.get("employee_count", 0)
if employees <= 50:
employee_counts["小型(≤50人)"] += 1
elif employees <= 200:
employee_counts["中型(51-200人)"] += 1
elif employees <= 1000:
employee_counts["大型(201-1000人)"] += 1
else:
employee_counts["超大型(>1000人)"] += 1
except (TypeError, ValueError):
employee_counts["未知"] += 1
# 4. 认证情况分析
cert_counts = defaultdict(int)
all_certs = defaultdict(int)
for factory in factories:
certs = factory.get("certifications", [])
if not certs:
cert_counts["无认证"] += 1
else:
cert_counts[f"{len(certs)}项认证"] += 1
for cert in certs:
all_certs[cert] += 1
# 5. 出口情况分析
export_counts = defaultdict(int)
export_countries = defaultdict(int)
for factory in factories:
has_export = factory.get("has_export", False)
if has_export:
export_counts["有出口资质"] += 1
# 统计出口国家
countries = factory.get("export_countries", [])
for country in countries:
export_countries[country] += 1
else:
export_counts["无出口资质"] += 1
# 6. 产能分析
output_ranges = defaultdict(int)
for factory in factories:
try:
output = factory.get("annual_output", 0)
if output == 0:
output_ranges["未知"] += 1
elif output <= 1000:
output_ranges["1000万以下"] += 1
elif output <= 5000:
output_ranges["1001-5000万"] += 1
elif output <= 10000:
output_ranges["5001-1亿"] += 1
else:
output_ranges["1亿以上"] += 1
except (TypeError, ValueError):
output_ranges["未知"] += 1
return {
"total_factories": len(factories),
"region_analysis": {
"province_distribution": dict(province_counts),
"city_distribution": dict(city_counts)
},
"age_analysis": dict(year_counts),
"scale_analysis": dict(employee_counts),
"certification_analysis": {
"cert_count_distribution": dict(cert_counts),
"top_certifications": sorted(all_certs.items(), key=lambda x: x[1], reverse=True)[:5]
},
"export_analysis": {
"export_status": dict(export_counts),
"top_export_countries": sorted(export_countries.items(), key=lambda x: x[1], reverse=True)[:5]
},
"production_capacity": dict(output_ranges)
}
def get_eligible_factories(self, factories: List[Dict], criteria: Dict) -> List[Dict]:
"""
根据自定义条件筛选合格工厂
:param factories: 工厂列表
:param criteria: 筛选条件字典
:return: 符合条件的工厂列表
"""
eligible = []
for factory in factories:
meet_criteria = True
# 检查成立年限
if "min_age" in criteria:
try:
age = datetime.now().year - factory.get("establish_year", 0)
if age < criteria["min_age"]:
meet_criteria = False
except:
meet_criteria = False
# 检查员工数量
if "min_employees" in criteria and meet_criteria:
if factory.get("employee_count", 0) < criteria["min_employees"]:
meet_criteria = False
# 检查认证要求
if "required_certs" in criteria and meet_criteria:
factory_certs = factory.get("certifications", [])
for cert in criteria["required_certs"]:
if cert not in factory_certs:
meet_criteria = False
break
# 检查出口要求
if "need_export" in criteria and meet_criteria:
if factory.get("has_export", False) != criteria["need_export"]:
meet_criteria = False
# 检查年产值
if "min_output" in criteria and meet_criteria:
if factory.get("annual_output", 0) < criteria["min_output"]:
meet_criteria = False
if meet_criteria:
eligible.append(factory)
logging.info(f"根据筛选条件,共找到 {len(eligible)} 家合格工厂")
return eligible
def visualize_analysis(self, analysis: Dict, keywords: str, output_dir: str = ".") -> None:
"""可视化分析结果"""
# 1. 省份分布饼图
if "region_analysis" in analysis and analysis["region_analysis"]["province_distribution"]:
plt.figure(figsize=(10, 8))
# 只显示占比前6的省份,其余归为"其他"
province_data = sorted(analysis["region_analysis"]["province_distribution"].items(),
key=lambda x: x[1], reverse=True)
if len(province_data) > 6:
top6 = province_data[:6]
others = sum(count for _, count in province_data[6:])
top6.append(("其他", others))
province_data = top6
labels, sizes = zip(*province_data)
plt.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=90)
plt.title(f'关键词"{keywords}"工厂省份分布')
plt.axis('equal')
plt.tight_layout()
plt.savefig(f"{output_dir}/province_distribution.png")
plt.close()
logging.info(f"省份分布图表已保存至 {output_dir}/province_distribution.png")
# 2. 成立年限分布条形图
if "age_analysis" in analysis and analysis["age_analysis"]:
plt.figure(figsize=(10, 6))
ages = list(analysis["age_analysis"].keys())
counts = list(analysis["age_analysis"].values())
plt.bar(ages, counts, color='lightblue')
plt.title(f'关键词"{keywords}"工厂成立年限分布')
plt.xlabel('成立年限')
plt.ylabel('工厂数量')
for i, v in enumerate(counts):
plt.text(i, v + 0.5, str(v), ha='center')
plt.tight_layout()
plt.savefig(f"{output_dir}/age_distribution.png")
plt.close()
logging.info(f"成立年限图表已保存至 {output_dir}/age_distribution.png")
# 3. 工厂规模分布饼图
if "scale_analysis" in analysis and analysis["scale_analysis"]:
plt.figure(figsize=(10, 8))
sizes = list(analysis["scale_analysis"].keys())
counts = list(analysis["scale_analysis"].values())
plt.pie(counts, labels=sizes, autopct='%1.1f%%', startangle=90)
plt.title(f'关键词"{keywords}"工厂规模分布')
plt.axis('equal')
plt.tight_layout()
plt.savefig(f"{output_dir}/scale_distribution.png")
plt.close()
logging.info(f"工厂规模图表已保存至 {output_dir}/scale_distribution.png")
# 4. 认证情况条形图
if "certification_analysis" in analysis and analysis["certification_analysis"]["cert_count_distribution"]:
plt.figure(figsize=(10, 6))
certs = list(analysis["certification_analysis"]["cert_count_distribution"].keys())
counts = list(analysis["certification_analysis"]["cert_count_distribution"].values())
plt.bar(certs, counts, color='lightgreen')
plt.title(f'关键词"{keywords}"工厂认证情况')
plt.xlabel('认证数量')
plt.ylabel('工厂数量')
for i, v in enumerate(counts):
plt.text(i, v + 0.5, str(v), ha='center')
plt.tight_layout()
plt.savefig(f"{output_dir}/certification_distribution.png")
plt.close()
logging.info(f"认证情况图表已保存至 {output_dir}/certification_distribution.png")
def export_to_excel(self, factories: List[Dict], analysis: Dict, meta_info: Dict, filename: str) -> None:
"""导出工厂数据和分析结果到Excel"""
if not factories and not analysis:
logging.warning("没有数据可导出")
return
try:
with pd.ExcelWriter(filename) as writer:
# 搜索元信息
pd.DataFrame([meta_info]).to_excel(writer, sheet_name='搜索信息', index=False)
# 工厂数据
if factories:
# 提取需要的字段
filtered_factories = []
for factory in factories:
filtered = {
"工厂ID": factory.get("factory_id"),
"工厂名称": factory.get("name"),
"省份": factory.get("province"),
"城市": factory.get("city"),
"成立年份": factory.get("establish_year"),
"员工数量": factory.get("employee_count"),
"厂房面积(㎡)": factory.get("workshop_area"),
"年产值(万元)": factory.get("annual_output"),
"主营产品": ",".join(factory.get("main_products", [])),
"认证数量": len(factory.get("certifications", [])),
"认证列表": ",".join(factory.get("certifications", [])),
"有出口资质": factory.get("has_export", False),
"出口国家": ",".join(factory.get("export_countries", []))
}
# 可选添加联系方式(如果有权限)
if "contact_person" in factory:
filtered["联系人"] = factory.get("contact_person")
filtered["联系电话"] = factory.get("contact_phone")
filtered_factories.append(filtered)
df_factories = pd.DataFrame(filtered_factories)
df_factories.to_excel(writer, sheet_name='工厂数据', index=False)
# 分析结果
if analysis and "error" not in analysis:
# 地区分析
if "region_analysis" in analysis:
df_province = pd.DataFrame(list(analysis["region_analysis"]["province_distribution"].items()),
columns=['省份', '数量'])
df_province.to_excel(writer, sheet_name='省份分布', index=False)
# 规模分析
if "scale_analysis" in analysis:
df_scale = pd.DataFrame(list(analysis["scale_analysis"].items()),
columns=['规模类型', '数量'])
df_scale.to_excel(writer, sheet_name='规模分布', index=False)
logging.info(f"数据已导出至 {filename}")
except Exception as e:
logging.error(f"导出Excel失败: {e}")
# 示例调用
if __name__ == "__main__":
# 替换为实际的参数(从平台获取)
APP_KEY = "your_app_key"
APP_SECRET = "your_app_secret"
KEYWORDS = "服装加工" # 搜索关键词
# 初始化API客户端
factory_api = FactorySearch(APP_KEY, APP_SECRET)
# 若为高级权限,设置更高的频率限制
# factory_api.set_rate_limit(200)
# 1. 设置筛选条件
filters = {
"province": "广东", # 广东省
"industry": "纺织服装", # 纺织服装行业
"min_employee": 100, # 至少100名员工
"has_export": True, # 有出口资质
"sort": "year_desc" # 按成立年限降序
}
# 2. 批量搜索工厂
print("=== 搜索工厂数据 ===")
factories, meta_info = factory_api.batch_search_factories(
keywords=KEYWORDS,
max_pages=3, # 获取前3页
page_size=30,
filters=filters
)
if factories:
print(f"搜索关键词: {KEYWORDS}")
print(f"获取工厂数量: {len(factories)}")
print(f"总工厂数量: {meta_info['total_factories']}")
print(f"数据最后更新时间: {meta_info['data_update_time']}")
# 3. 分析工厂数据
print("\n=== 工厂数据分析 ===")
if factories:
analysis = factory_api.analyze_factories(factories)
print("省份分布:")
for province, count in sorted(analysis["region_analysis"]["province_distribution"].items(),
key=lambda x: x[1], reverse=True)[:3]:
print(f" {province}: {count}家工厂")
print("\n成立年限分布:")
for age, count in analysis["age_analysis"].items():
print(f" {age}: {count}家工厂")
print("\n工厂规模分布:")
for scale, count in analysis["scale_analysis"].items():
print(f" {scale}: {count}家工厂")
print("\n出口情况:")
for status, count in analysis["export_analysis"]["export_status"].items():
print(f" {status}: {count}家工厂")
# 4. 筛选符合条件的优质工厂
print("\n=== 筛选优质工厂 ===")
criteria = {
"min_age": 10, # 至少成立10年
"min_employees": 200, # 至少200名员工
"required_certs": ["ISO9001"], # 必须有ISO9001认证
"need_export": True, # 需要有出口资质
"min_output": 5000 # 年产值至少5000万
}
eligible_factories = factory_api.get_eligible_factories(factories, criteria)
if eligible_factories:
print(f"符合条件的工厂有 {len(eligible_factories)} 家,前3名:")
for i, factory in enumerate(eligible_factories[:3], 1):
print(f"{i}. {factory['name']}")
print(f" 所在地: {factory['province']}-{factory['city']}")
print(f" 成立年份: {factory['establish_year']},员工数: {factory['employee_count']}")
print(f" 年产值: {factory['annual_output']}万元,认证: {','.join(factory.get('certifications', []))[:30]}...")
# 5. 可视化分析结果
factory_api.visualize_analysis(analysis, KEYWORDS)
# 6. 导出数据到Excel
factory_api.export_to_excel(factories, analysis, meta_info, "工厂搜索分析.xlsx")
else:
print("未获取到工厂数据,无法进行分析")