×

📉 从零搭一个淘宝商品价格监控系统:TOP API + 定时任务 + 微信推送(附Python源码)

万邦科技Lex 万邦科技Lex 发表于2026-06-28 09:29:40 浏览20 评论0

抢沙发发表评论

📉 从零搭一个淘宝商品价格监控系统:TOP API + 定时任务 + 微信推送(附Python源码)

用淘宝官方 taobao.item.get定时拉商品价格 → 降价/涨价超阈值 → 推企业微信/飞书/钉钉 Webhook。全程合法、无爬虫、可7×24运行。

一、监控架构

┌──────────────┐   每N分钟        ┌────────────────────┐
│ APScheduler  │ ──────────────▶ │ TOP API             │
│ 定时任务     │                  │ taobao.item.get     │
└──────┬───────┘                  └─────────┬──────────┘
       │ 价格变化 Δ > threshold            │
       ▼                                   │
┌──────────────────┐      Webhook         │
│ 企业微信/钉钉Bot │ ◀────────────────────┘
└──────────────────┘  (Markdown消息)

二、完整可运行代码

# taobao_price_monitor.py
"""
淘宝商品价格监控系统
- 跟踪关注商品(num_iid)价格
- 变化幅度超 threshold 推企业微信机器人
- SQLite 本地存快照(首次建表自动)
依赖: requests  (pip install requests)
"""
import hashlib
import time
import requests
import sqlite3
import logging
from typing import Dict, Optional
from datetime import datetime

# ── 配置区 ──────────────────────────────────────
APP_KEY = "YOUR_TOP_APP_KEY"
APP_SECRET = "YOUR_TOP_APP_SECRET"
SANDBOX = False                     # 生产=False
CHECK_INTERVAL_SEC = 1800           # 30分钟
PRICE_CHANGE_THRESHOLD = 0.05       # 涨跌超5%触发告警
WECOM_BOT_WEBHOOK = (              # 企业微信群机器人Webhook
    "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=YOUR_KEY"
)
WATCH_LIST = [                     # 监控商品列表
    {"num_iid": "654321098765", "alias": "304不锈钢保温杯 500ml"},
    # {"num_iid": "612345678901", "alias": "其它监控商品"},
]
# ────────────────────────────────────────────────


# ============ TOP Client (内联最小化版) ============
class TopClient:
    GW_PROD = "https://gw.api.taobao.com/router/rest"
    GW_SBOX = "https://gw.api.tbsandbox.com/router/rest"

    def __init__(self, ak, ask, sandbox=False):
        self.ak, self.ask = ak, ask
        self.gw = self.GW_SBOX if sandbox else self.GW_PROD

    def _sign(self, p: Dict) -> str:
        filt = sorted((k, v) for k, v in p.items()
                       if v is not None and str(v).strip() != '' and k != 'sign')
        qs = ''.join(f"{k}{v}" for k, v in filt)
        return hashlib.md5(f"{self.ask}{qs}{self.ask}".encode()).hexdigest().upper()

    def call(self, method: str, biz: Dict, session=None) -> Dict:
        p = {"method": method, "app_key": self.ak,
             "timestamp": str(int(time.time() * 1000)),
             "format": "json", "v": "2.0", "sign_method": "md5"}
        if session: p["session"] = session
        p.update(biz)
        p["sign"] = self._sign(p)
        r = requests.post(self.gw, data=p, timeout=15)
        r.raise_for_status()
        d = r.json()
        if "error_response" in d:
            err = d["error_response"]
            raise Exception(f"TOP[{err.get('code')}]: {err.get('msg')} {err.get('sub_msg','')}")
        return d.get(list(d.keys() - {"error_response"})[0], {})

    def get_price(self, num_iid: str) -> float:
        resp = self.call("taobao.item.get", {
            "num_iid": num_iid,
            "fields": "num_iid,title,price,approve_status"
        })
        item = resp.get("item", {})
        if item.get("approve_status") != "onsale":
            raise ValueError(f"商品{num_iid} 已下架/仓库中")
        return float(item.get("price") or 0)
# ===================================================


# ============ SQLite 价格快照 ====================
def init_db(db_path="price_monitor.db"):
    conn = sqlite3.connect(db_path)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS price_snapshot(
            num_iid TEXT PRIMARY KEY,
            last_price REAL,
            updated_at TEXT
        )
    """)
    conn.commit()
    return conn


def load_last_price(conn, num_iid: str) -> Optional[float]:
    cur = conn.execute("SELECT last_price FROM price_snapshot WHERE num_iid=?", (num_iid,))
    row = cur.fetchone()
    return row[0] if row else None


def save_price(conn, num_iid: str, price: float):
    conn.execute("""
        INSERT INTO price_snapshot(num_iid,last_price,updated_at)
        VALUES(?,?,?)
        ON CONFLICT(num_iid) DO UPDATE SET last_price=excluded.last_price,
                                            updated_at=excluded.updated_at
    """, (num_iid, price, datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
    conn.commit()
# ===================================================


# ============ 企业微信推送 =======================
def push_wecom(title: str, old: float, new: float, alias: str, num_iid: str):
    if not WECOM_BOT_WEBHOOK:
        return
    direction = "📉 降价" if new < old else "📈 涨价"
    pct = abs(new - old) / old * 100 if old else 0
    content = (
        f"## 🛒 淘宝商品价格变动通知\n"
        f"**商品**:{alias}\n"
        f"**ID**:`{num_iid}`\n"
        f"**{direction}**:¥{old:.2f} → **¥{new:.2f}**  (±{pct:.1f}%)\n"
        f"> 监控时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
    )
    try:
        requests.post(WECOM_BOT_WEBHOOK, json={
            "msgtype": "markdown",
            "markdown": {"content": content}
        }, timeout=10)
    except Exception as e:
        logging.warning(f"微信推送失败: {e}")

# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
# ============ 核心监控逻辑 =======================
def monitor_once(client: TopClient, conn):
    for w in WATCH_LIST:
        num_iid = str(w["num_iid"])
        alias = w.get("alias") or num_iid
        try:
            cur_price = client.get_price(num_iid)
            last_price = load_last_price(conn, num_iid)

            if last_price is None:
                # 首次记录,不告警
                save_price(conn, num_iid, cur_price)
                print(f"[INIT] {alias} 初始价格 ¥{cur_price:.2f}")
                continue

            diff_ratio = abs(cur_price - last_price) / last_price if last_price else 0
            if diff_ratio >= PRICE_CHANGE_THRESHOLD:
                print(f"[ALERT] {alias} ¥{last_price:.2f}→¥{cur_price:.2f} "
                      f"(Δ{diff_ratio*100:.1f}%)")
                push_wecom(alias, last_price, cur_price, alias, num_iid)
            else:
                print(f"[OK] {alias} 价格稳定 ¥{cur_price:.2f}")

            save_price(conn, num_iid, cur_price)

        except ValueError as ve:
            print(f"[WARN] {alias}: {ve}")
        except Exception as e:
            print(f"[ERR] {alias}: {e}")

# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
# ============ 入口 ===============================
if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO,
                         format="%(asctime)s %(levelname)s %(message)s")

    client = TopClient(APP_KEY, APP_SECRET, sandbox=SANDBOX)
    conn = init_db()

    print(f"▶ 淘宝价格监控启动,间隔={CHECK_INTERVAL_SEC}s,阈值={PRICE_CHANGE_THRESHOLD*100}%")
    monitor_once(client, conn)      # 立即执行一次

    # ---- 常驻定时(取消注释启用)----
    # import apscheduler
    # from apscheduler.schedulers.blocking import BlockingScheduler
    # sched = BlockingScheduler()
    # sched.add_job(lambda: monitor_once(client, conn),
    #                'interval', seconds=CHECK_INTERVAL_SEC, id='price_monitor')
    # try:
    #     sched.start()
    # except (KeyboardInterrupt, SystemExit):
    #     sched.shutdown()

三、配置步骤

  1. 安装依赖

pip install requests apscheduler   # apscheduler 仅常驻时使用
  1. 填写配置

    • APP_KEY/ APP_SECRET:淘宝开放平台应用

    • WECOM_BOT_WEBHOOK:企业微信群 → 添加机器人 → 复制 Webhook URL

    • WATCH_LIST:填入要监控的 num_iid(从商品URL id=提取)

  2. 运行

python taobao_price_monitor.py
首次运行记录基准价不发告警,后续价格变动 ±5%(可改 PRICE_CHANGE_THRESHOLD)推企微。
  1. 常驻(取消代码尾部注释)

nohup python taobao_price_monitor.py > monitor.log 2>&1 &

四、避坑

现象
解决
取不到 price
商品下架 approve_status!=onsale
代码已捕获跳过
Invalid Signature
时间戳秒级
确保 int(time.time()*1000)
企�推送无反应
Webhook key错 / IP受限
企业微信后台查机器人日志
QPS触发限流
监控商品过多
令牌桶限速 或 加大 CHECK_INTERVAL_SEC
num_iid 错
复制完整URL未提取ID
只保留数字部分

五、面试/方案一句话

淘宝价格监控 = 定时任务调 taobao.item.get(fields=price)取一口价 → SQLite存上次快照 → 差价超阈值调企业微信Markdown Webhook推送;全程用官方TOP API合法获取,不爬页面。
需要我补 钉钉/飞书 Webhook 消息格式多SKU规格价监控(查 skus[].price) 吗?


群贤毕至

访客