🏬 多店铺淘宝订单统一拉取:TOP API + 分布式任务调度实战(附Python源码)
多淘宝/天猫店铺订单集中同步到一套ERP,核心是每个店铺一个 Seller AccessToken → 统一增量时间窗拉取 → 以 tid 幂等入库 → 用分布式锁/原子标记防多节点重复跑。下面给你可直接用的 Python 实现。
一、多店铺同步架构
┌──────────────────────────────────────────────────────┐ │ 调度器 (APScheduler / Celery Beat) │ │ 获取 shop_list → 分布式锁(shop_id+date) │ └──────────────────────┬───────────────────────────────┘ ┌─────────────┼─────────────┐ ▼ ▼ ▼ Shop_A Token Shop_B Token Shop_C Token │ │ │ taobao.trades.sold.get (modified 时间窗) │ ▼ taobao.trade.fullinfo.get (逐单) │ ▼ ERP订单表 (UNIQUE(tid, shop_id) 幂等)
⚠️ 每个店铺必须在开放平台分别 OAuth 授权,得到对应access_token/refresh_token,存shop_auth表。
二、完整 Python 多店铺同步模块(含分布式文件锁示意)
# top_multi_shop_sync.py
"""
多店铺淘宝订单统一拉取
- 支持 N 个店铺(Seller AccessToken 不同)
- 增量按 modified 时间窗
- 文件锁防同机多进程重复跑(分布式子可以用 Redis SET NX EX)
- 幂等入库用 (tid, shop_id) UNIQUE
依赖: requests apscheduler (pip install requests apscheduler)
"""
import hashlib
import time
import requests
import sqlite3
import os
import fcntl
from datetime import datetime, timedelta
from typing import Dict, List
# ───────────── TOP Client (内联) ─────────────
class TopClient:
GW = "https://gw.api.taobao.com/router/rest"
def __init__(self, ak, ask):
self.ak, self.ask = ak, ask
def _sign(self, p: Dict) -> str:
filt = sorted((k, v) for k, v in p.items()
if v is not None and str(v).strip() != '' and k != 'sign')
qs = ''.join(f"{k}{v}" for k, v in filt)
return hashlib.md5(f"{self.ask}{qs}{self.ask}".encode()).hexdigest().upper()
def call(self, method, biz, session):
p = {"method": method, "app_key": self.ak,
"timestamp": str(int(time.time() * 1000)),
"format": "json", "v": "2.0", "sign_method": "md5",
"session": session}
p.update(biz)
p["sign"] = self._sign(p)
r = requests.post(self.GW, data=p, timeout=15)
r.raise_for_status()
d = r.json()
if "error_response" in d:
err = d["error_response"]
raise Exception(f"TOP[{err.get('code')}]:{err.get('msg')} {err.get('sub_msg','')}")
return d.get(list(d.keys() - {"error_response"})[0], {})
# ───────────── 本地DB (shop_auth + erp_order) ─────────────
def init_db(db="multi_shop_erp.db"):
conn = sqlite3.connect(db)
conn.execute("""CREATE TABLE IF NOT EXISTS shop_auth(
shop_id TEXT PRIMARY KEY,
shop_name TEXT,
access_token TEXT,
refresh_token TEXT,
expires_at TEXT)""")
conn.execute("""CREATE TABLE IF NOT EXISTS erp_order(
tid TEXT, shop_id TEXT,
status TEXT, payment REAL, buyer_nick TEXT, created TEXT,
PRIMARY KEY(tid, shop_id))""")
conn.commit()
return conn
def load_shops(conn) -> List[Dict]:
cur = conn.execute("SELECT shop_id,shop_name,access_token FROM shop_auth")
return [{"shop_id": r[0], "shop_name": r[1], "token": r[2]}
for r in cur.fetchall()]
def upsert_order(conn, shop_id, trade: Dict):
conn.execute("""INSERT INTO erp_order(tid,shop_id,status,payment,buyer_nick,created)
VALUES(?,?,?,?,?,?)
ON CONFLICT(tid,shop_id) DO UPDATE SET
status=excluded.status,payment=excluded.payment,
buyer_nick=excluded.buyer_nick,created=excluded.created""",
(str(trade["tid"]), shop_id,
trade.get("status"), float(trade.get("payment") or 0),
trade.get("buyer_nick",""), trade.get("created","")))
conn.commit()
# ───────────── 分布式文件锁 (单进程/单机示意) ─────────────
LOCK_FILE = "/tmp/top_multishop_sync.lock"
def acquire_lock():
fd = open(LOCK_FILE, "w")
try:
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) # 非阻塞
return fd
except (IOError, BlockingIOError):
fd.close()
raise RuntimeError("另一实例正在运行中,退出")
def release_lock(fd):
fcntl.flock(fd, fcntl.LOCK_UN)
fd.close()
try:
os.remove(LOCK_FILE)
except OSError:
pass
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
# ───────────── 核心同步逻辑 ─────────────
class MultiShopOrderSync:
def __init__(self, ak, ask, conn):
self.top = TopClient(ak, ask)
self.conn = conn
def sync_one_shop(self, shop: Dict, minutes=30):
token = shop["token"]
shop_id = shop["shop_id"]
now = datetime.now()
start = (now - timedelta(minutes=minutes)).strftime("%Y-%m-%d %H:%M:%S")
end = now.strftime("%Y-%m-%d %H:%M:%S")
# ① 列表
r = self.top.call("taobao.trades.sold.get", {
"fields": "tid,status,payment,modified,buyer_nick,created",
"start_modified": start, "end_modified": end,
"page_no": 1, "page_size": 40
}, token)
tids = [t["tid"] for t in (r.get("trades", []) or [])]
for tid in tids:
# ② 明细
detail = self.top.call("taobao.trade.fullinfo.get", {
"tid": str(tid),
"fields": "tid,status,payment,buyer_nick,created"
}, token).get("trade", {})
upsert_order(self.conn, shop_id, detail)
time.sleep(0.15) # QPS 保护
print(f" ✅ 店铺[{shop['shop_name']}] 同步订单: {len(tids)} 笔")
def sync_all(self, minutes=30):
shops = load_shops(self.conn)
if not shops:
print("⚠️ 无店铺配置,请在 shop_auth 表录入 shop_id/shop_name/access_token")
return
for s in shops:
try:
self.sync_one_shop(s, minutes)
except Exception as e:
print(f" ❌ 店铺[{s['shop_name']}] 失败: {e}")
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
# ======================= 定时入口 =======================
if __name__ == "__main__":
APP_KEY = "YOUR_ENTERPRISE_APP_KEY"
APP_SECRET = "YOUR_APP_SECRET"
conn = init_db()
# ★ 首次运行前手动 INSERT shop_auth 或写一小段初始化
# conn.execute("INSERT OR IGNORE INTO shop_auth VALUES('SHOP001','旗舰店A','SELLER_TOKEN_A','',NULL)")
syncer = MultiShopOrderSync(APP_KEY, APP_SECRET, conn)
try:
lock_fd = acquire_lock()
print(f"▶ 多店铺订单同步开始 @ {datetime.now().strftime('%H:%M:%S')}")
syncer.sync_all(minutes=30)
print("▶ 同步完成")
except RuntimeError as e:
print(e)
finally:
try:
release_lock(lock_fd)
except Exception:
pass三、shop_auth 表初始化示例(SQLite)
INSERT OR IGNORE INTO shop_auth(shop_id,shop_name,access_token)
VALUES
('SHOP001','天猫旗舰店A','SELLER_ACCESS_TOKEN_A'),
('SHOP002','淘宝企业店B','SELLER_ACCESS_TOKEN_B');AccessToken 通过各店铺卖家账号 OAuth2 授权换取(同之前讲过的oauth.taobao.com/token流程),每个店铺 token 不同。
四、分布式调度建议
部署形态 | 防重手段 |
|---|---|
单机多进程 | 文件锁 fcntl.flock(已演示) |
多机 / K8s | Redis SET key value NX EX 300 获取锁 → 执行 → DEL;失败跳过 |
定时触发 | APScheduler BlockingScheduler+ 锁 / Celery Beat |
断点续跑 | 记录每个 shop 最后成功 max_modified→ 下次从此时间拉 |
APScheduler 常驻示例:
from apscheduler.schedulers.blocking import BlockingScheduler sched = BlockingScheduler() sched.add_job(lambda: MultiShopOrderSync(APP_KEY,APP_SECRET,conn).sync_all(30), 'cron', minute='*/5', id='tb_multi_shop_sync') sched.start()
五、避坑清单
坑 | 现象 | 解决 |
|---|---|---|
用同一 token 查不同店铺 | 只能看授权店铺自己订单 | 每个店铺独立 OAuth → 独立 token |
session 传买家 token | 403 / 空 | 必须是卖家 AccessToken |
全量翻页不记断点 | 重启重跑超日额度 | 存 last_sync_timeper shop |
多节点同时跑 | 重复插入/重复API调用 | Redis NX 锁 |
淘宝客应用无订单权限 | 403 | 创建自用型企业应用申请 taobao.trades.sold.get |
六、面试/方案一句话
多店铺淘宝订单统一拉取 = 各店铺分别 OAuth 授权存 Seller AccessToken → 定时增量按modified时间窗调taobao.trades.sold.get+trade.fullinfo.get→ 以(tid, shop_id)幂等入库 → 分布式 Redis NX 锁防多节点重复触发,签名/QPS 同单店铺,仅 token 按店铺隔离。
需要我补 Redis 分布式锁 Python 示例 或 Token 自动刷新(refresh_token 过期前置换) 吗?