📉 从零搭一个淘宝商品价格监控系统:TOP API + 定时任务 + 微信推送(附Python源码)
用淘宝官方
taobao.item.get定时拉商品价格 → 降价/涨价超阈值 → 推企业微信/飞书/钉钉 Webhook。全程合法、无爬虫、可7×24运行。一、监控架构
┌──────────────┐ 每N分钟 ┌────────────────────┐ │ APScheduler │ ──────────────▶ │ TOP API │ │ 定时任务 │ │ taobao.item.get │ └──────┬───────┘ └─────────┬──────────┘ │ 价格变化 Δ > threshold │ ▼ │ ┌──────────────────┐ Webhook │ │ 企业微信/钉钉Bot │ ◀────────────────────┘ └──────────────────┘ (Markdown消息)
二、完整可运行代码
# taobao_price_monitor.py
"""
淘宝商品价格监控系统
- 跟踪关注商品(num_iid)价格
- 变化幅度超 threshold 推企业微信机器人
- SQLite 本地存快照(首次建表自动)
依赖: requests (pip install requests)
"""
import hashlib
import time
import requests
import sqlite3
import logging
from typing import Dict, Optional
from datetime import datetime
# ── 配置区 ──────────────────────────────────────
APP_KEY = "YOUR_TOP_APP_KEY"
APP_SECRET = "YOUR_TOP_APP_SECRET"
SANDBOX = False # 生产=False
CHECK_INTERVAL_SEC = 1800 # 30分钟
PRICE_CHANGE_THRESHOLD = 0.05 # 涨跌超5%触发告警
WECOM_BOT_WEBHOOK = ( # 企业微信群机器人Webhook
"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=YOUR_KEY"
)
WATCH_LIST = [ # 监控商品列表
{"num_iid": "654321098765", "alias": "304不锈钢保温杯 500ml"},
# {"num_iid": "612345678901", "alias": "其它监控商品"},
]
# ────────────────────────────────────────────────
# ============ TOP Client (内联最小化版) ============
class TopClient:
GW_PROD = "https://gw.api.taobao.com/router/rest"
GW_SBOX = "https://gw.api.tbsandbox.com/router/rest"
def __init__(self, ak, ask, sandbox=False):
self.ak, self.ask = ak, ask
self.gw = self.GW_SBOX if sandbox else self.GW_PROD
def _sign(self, p: Dict) -> str:
filt = sorted((k, v) for k, v in p.items()
if v is not None and str(v).strip() != '' and k != 'sign')
qs = ''.join(f"{k}{v}" for k, v in filt)
return hashlib.md5(f"{self.ask}{qs}{self.ask}".encode()).hexdigest().upper()
def call(self, method: str, biz: Dict, session=None) -> Dict:
p = {"method": method, "app_key": self.ak,
"timestamp": str(int(time.time() * 1000)),
"format": "json", "v": "2.0", "sign_method": "md5"}
if session: p["session"] = session
p.update(biz)
p["sign"] = self._sign(p)
r = requests.post(self.gw, data=p, timeout=15)
r.raise_for_status()
d = r.json()
if "error_response" in d:
err = d["error_response"]
raise Exception(f"TOP[{err.get('code')}]: {err.get('msg')} {err.get('sub_msg','')}")
return d.get(list(d.keys() - {"error_response"})[0], {})
def get_price(self, num_iid: str) -> float:
resp = self.call("taobao.item.get", {
"num_iid": num_iid,
"fields": "num_iid,title,price,approve_status"
})
item = resp.get("item", {})
if item.get("approve_status") != "onsale":
raise ValueError(f"商品{num_iid} 已下架/仓库中")
return float(item.get("price") or 0)
# ===================================================
# ============ SQLite 价格快照 ====================
def init_db(db_path="price_monitor.db"):
conn = sqlite3.connect(db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS price_snapshot(
num_iid TEXT PRIMARY KEY,
last_price REAL,
updated_at TEXT
)
""")
conn.commit()
return conn
def load_last_price(conn, num_iid: str) -> Optional[float]:
cur = conn.execute("SELECT last_price FROM price_snapshot WHERE num_iid=?", (num_iid,))
row = cur.fetchone()
return row[0] if row else None
def save_price(conn, num_iid: str, price: float):
conn.execute("""
INSERT INTO price_snapshot(num_iid,last_price,updated_at)
VALUES(?,?,?)
ON CONFLICT(num_iid) DO UPDATE SET last_price=excluded.last_price,
updated_at=excluded.updated_at
""", (num_iid, price, datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
conn.commit()
# ===================================================
# ============ 企业微信推送 =======================
def push_wecom(title: str, old: float, new: float, alias: str, num_iid: str):
if not WECOM_BOT_WEBHOOK:
return
direction = "📉 降价" if new < old else "📈 涨价"
pct = abs(new - old) / old * 100 if old else 0
content = (
f"## 🛒 淘宝商品价格变动通知\n"
f"**商品**:{alias}\n"
f"**ID**:`{num_iid}`\n"
f"**{direction}**:¥{old:.2f} → **¥{new:.2f}** (±{pct:.1f}%)\n"
f"> 监控时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
)
try:
requests.post(WECOM_BOT_WEBHOOK, json={
"msgtype": "markdown",
"markdown": {"content": content}
}, timeout=10)
except Exception as e:
logging.warning(f"微信推送失败: {e}")
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
# ============ 核心监控逻辑 =======================
def monitor_once(client: TopClient, conn):
for w in WATCH_LIST:
num_iid = str(w["num_iid"])
alias = w.get("alias") or num_iid
try:
cur_price = client.get_price(num_iid)
last_price = load_last_price(conn, num_iid)
if last_price is None:
# 首次记录,不告警
save_price(conn, num_iid, cur_price)
print(f"[INIT] {alias} 初始价格 ¥{cur_price:.2f}")
continue
diff_ratio = abs(cur_price - last_price) / last_price if last_price else 0
if diff_ratio >= PRICE_CHANGE_THRESHOLD:
print(f"[ALERT] {alias} ¥{last_price:.2f}→¥{cur_price:.2f} "
f"(Δ{diff_ratio*100:.1f}%)")
push_wecom(alias, last_price, cur_price, alias, num_iid)
else:
print(f"[OK] {alias} 价格稳定 ¥{cur_price:.2f}")
save_price(conn, num_iid, cur_price)
except ValueError as ve:
print(f"[WARN] {alias}: {ve}")
except Exception as e:
print(f"[ERR] {alias}: {e}")
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
# ============ 入口 ===============================
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s")
client = TopClient(APP_KEY, APP_SECRET, sandbox=SANDBOX)
conn = init_db()
print(f"▶ 淘宝价格监控启动,间隔={CHECK_INTERVAL_SEC}s,阈值={PRICE_CHANGE_THRESHOLD*100}%")
monitor_once(client, conn) # 立即执行一次
# ---- 常驻定时(取消注释启用)----
# import apscheduler
# from apscheduler.schedulers.blocking import BlockingScheduler
# sched = BlockingScheduler()
# sched.add_job(lambda: monitor_once(client, conn),
# 'interval', seconds=CHECK_INTERVAL_SEC, id='price_monitor')
# try:
# sched.start()
# except (KeyboardInterrupt, SystemExit):
# sched.shutdown()三、配置步骤
- 安装依赖
pip install requests apscheduler # apscheduler 仅常驻时使用
- 填写配置
APP_KEY/APP_SECRET:淘宝开放平台应用WECOM_BOT_WEBHOOK:企业微信群 → 添加机器人 → 复制 Webhook URLWATCH_LIST:填入要监控的num_iid(从商品URLid=提取)- 运行
python taobao_price_monitor.py
首次运行记录基准价不发告警,后续价格变动 ±5%(可改
PRICE_CHANGE_THRESHOLD)推企微。- 常驻(取消代码尾部注释)
nohup python taobao_price_monitor.py > monitor.log 2>&1 &
四、避坑
坑 | 现象 | 解决 |
|---|---|---|
取不到 price | 商品下架 approve_status!=onsale | 代码已捕获跳过 |
Invalid Signature | 时间戳秒级 | 确保 int(time.time()*1000) |
企�推送无反应 | Webhook key错 / IP受限 | 企业微信后台查机器人日志 |
QPS触发限流 | 监控商品过多 | 令牌桶限速 或 加大 CHECK_INTERVAL_SEC |
num_iid 错 | 复制完整URL未提取ID | 只保留数字部分 |
五、面试/方案一句话
淘宝价格监控 = 定时任务调taobao.item.get(fields=price)取一口价 → SQLite存上次快照 → 差价超阈值调企业微信Markdown Webhook推送;全程用官方TOP API合法获取,不爬页面。
需要我补 钉钉/飞书 Webhook 消息格式 或 多SKU规格价监控(查 skus[].price) 吗?