澎湃新闻(The Paper)作为国内主流深度资讯平台,其 item_search 热榜数据接口是精准抓取平台核心热榜(时政、社会、财经、科技等)的核心入口。该接口支持按热榜类型、时间范围、热度排序等多维度筛选,快速获取热榜新闻的标题、热度指数、发布时间、传播数据等核心信息,广泛应用于舆情监测、资讯聚合、热点追踪、内容分析等场景,是对接澎湃网热点数据的必备工具。
一、接口核心认知:先明确 “能做什么”“适配什么场景”
1. 接口定位与核心价值
2. 核心参数与返回字段(热榜场景适配版)
(1)请求参数(必填 + 可选,按优先级排序)
(2)返回核心字段(按业务场景分类,热榜重点标注)
3. 接口限制与注意事项
二、对接前准备:3 步搞定前置条件
1. 注册与获取密钥(核心步骤)
2. 技术环境准备
(1)支持语言与协议
(2)必备工具与依赖
3. 业务需求梳理
三、实操步骤:从调试到落地(Python 示例)
步骤 1:理解请求流程
步骤 2:签名生成规则(关键!避免调用失败)
签名示例(参数排序与拼接)
步骤 3:完整代码实现(Python)
(1)依赖安装
(2)完整代码(含签名生成、分页获取、实时抓取、数据保存)
import requests
import hashlib
import time
import json
import pandas as pd
from urllib.parse import urlencode
from typing import Dict, Optional, List
from apscheduler.schedulers.blocking import BlockingScheduler
import logging
# 配置日志(记录接口调用、错误信息)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.FileHandler("thepaper_hot_search.log"), logging.StreamHandler()]
)
# 接口核心配置(替换为自己的密钥和接口地址)
APP_KEY = "你的appkey"
SECRET = "你的secret"
API_URL = "https://open.thepaper.cn/api/item_search/hot" # 澎湃网热榜接口正式地址
SAVE_PATH = "澎湃网热榜数据.xlsx" # 数据保存路径
REALTIME_INTERVAL = 300 # 实时抓取间隔(单位:秒,默认5分钟)
def generate_sign(params: Dict) -> str:
"""生成接口签名(严格按平台规则实现,核心函数)"""
# 1. 按参数名ASCII升序排序(排除sign字段)
sorted_params = sorted(params.items(), key=lambda x: x[0])
# 2. 拼接参数字符串(urlencode自动处理中文、空格等特殊字符)
param_str = urlencode(sorted_params, encoding="utf-8") + f"&secret={SECRET}"
# 3. MD5加密(32位大写)
md5 = hashlib.md5()
md5.update(param_str.encode("utf-8"))
return md5.hexdigest().upper()
def parse_hot_data(item: Dict) -> Dict:
"""解析热榜数据(标准化字段命名,适配数据分析场景)"""
# 时间格式转换(毫秒级时间戳→YYYY-MM-DD HH:MM:SS)
def timestamp_to_str(ts: int) -> str:
try:
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(ts/1000))
except:
return ""
return {
"新闻ID": item.get("item_id", ""),
"热榜排名": item.get("rank", 0),
"热榜类型": item.get("hot_type", "").replace(",", "|"),
"热度指数": item.get("hot_index", 0),
"标题": item.get("title", ""),
"摘要": item.get("summary", ""),
"核心关键词": ",".join(item.get("tags", [])),
"话题分类": item.get("topic_category", ""),
"所属栏目": item.get("column", ""),
"发布时间": timestamp_to_str(item.get("pub_time", 0)),
"上榜时间": timestamp_to_str(item.get("rank_time", 0)),
"来源媒体": item.get("source", "澎湃新闻"),
"作者": item.get("author", ""),
"阅读量": item.get("read_count", 0),
"评论量": item.get("comment_count", 0),
"转发量": item.get("share_count", 0),
"点赞量": item.get("like_count", 0),
"详情页URL": item.get("detail_url", ""),
"封面图URL": item.get("cover_img", ""),
"是否原创": "是" if item.get("is_original", False) else "否",
"是否置顶": "是" if item.get("is_top", False) else "否"
}
def get_hot_list(
keywords: str,
hot_type: Optional[str] = None,
time_range: str = "today",
sort_type: str = "hot",
page_no: int = 1,
page_size: int = 50
) -> Dict:
"""
调用item_search接口获取单页热榜数据
:param keywords: 搜索关键词(热榜类型+主题)
:param hot_type: 热榜类型(精准筛选)
:param time_range: 时间范围
:param sort_type: 排序方式
:param page_no: 页码
:param page_size: 每页条数
:return: 标准化后的单页热榜数据
"""
# 1. 构建基础参数(必填项)
params = {
"appkey": APP_KEY,
"keywords": keywords,
"time_range": time_range,
"sort_type": sort_type,
"page_no": page_no,
"page_size": page_size,
"timestamp": int(time.time() * 1000),
# 按需筛选字段,减少数据传输量
"fields": "item_id,rank,hot_type,hot_index,title,summary,tags,topic_category,column,pub_time,rank_time,source,author,read_count,comment_count,share_count,like_count,detail_url,cover_img,is_original,is_top"
}
# 2. 添加可选参数
if hot_type:
params["hot_type"] = hot_type
# 3. 生成签名
params["sign"] = generate_sign(params)
try:
# 4. 发送POST请求(HTTPS协议,超时10秒)
response = requests.post(
url=API_URL,
data=json.dumps(params),
headers={"Content-Type": "application/json"},
timeout=10,
verify=True
)
response.raise_for_status() # 抛出HTTP请求异常(如404、500)
result = response.json()
# 5. 处理响应
if result.get("code") == 200:
raw_data = result.get("data", {})
hot_list = raw_data.get("list", [])
standard_hot_list = [parse_hot_data(item) for item in hot_list]
return {
"success": True,
"data": standard_hot_list,
"total": raw_data.get("total", 0),
"page_no": page_no,
"page_size": page_size
}
else:
logging.error(f"接口返回错误:code={result.get('code')}, msg={result.get('msg')}")
return {
"success": False,
"error_code": result.get("code"),
"error_msg": result.get("msg", "接口调用失败")
}
except requests.exceptions.RequestException as e:
logging.error(f"网络异常:{str(e)}")
return {
"success": False,
"error_code": -2,
"error_msg": f"网络异常:{str(e)}"
}
except Exception as e:
logging.error(f"数据处理异常:{str(e)}")
return {
"success": False,
"error_code": -3,
"error_msg": f"处理异常:{str(e)}"
}
def batch_get_hot_list(
keywords: str,
hot_type: Optional[str] = None,
time_range: str = "today",
sort_type: str = "hot",
max_page: int = 50
) -> List[Dict]:
"""
批量获取热榜数据(自动分页,获取所有符合条件的数据)
:param keywords: 搜索关键词
:param hot_type: 热榜类型
:param time_range: 时间范围
:param sort_type: 排序方式
:param max_page: 最大页码(默认50)
:return: 所有页的标准化热榜数据
"""
all_hot_data = []
page_no = 1
total_count = 0
logging.info(f"开始批量获取热榜数据,关键词:{keywords},热榜类型:{hot_type},时间范围:{time_range}")
while page_no <= max_page:
result = get_hot_list(
keywords=keywords,
hot_type=hot_type,
time_range=time_range,
sort_type=sort_type,
page_no=page_no,
page_size=50
)
if not result["success"]:
logging.error(f"第{page_no}页获取失败:{result['error_msg']}(错误码:{result['error_code']})")
# 频率超限,暂停1分钟后重试(仅重试1次)
if result["error_code"] == 429:
logging.info("频率超限,暂停60秒后重试...")
time.sleep(60)
continue
break
current_page_data = result["data"]
if not current_page_data:
logging.info(f"第{page_no}页无数据,停止获取")
break
all_hot_data.extend(current_page_data)
total_count = result["total"]
logging.info(f"第{page_no}页获取成功,累计获取{len(all_hot_data)}/{total_count}条数据")
# 已获取全部数据,停止分页
if len(all_hot_data) >= total_count:
logging.info(f"已获取全部热榜数据(共{total_count}条)")
break
# 控制调用频率(个人版30秒/页,企业版1秒/页)
time.sleep(30)
page_no += 1
return all_hot_data
def save_hot_data(hot_data: List[Dict], save_path: str = SAVE_PATH):
"""将热榜数据保存为Excel文件(便于数据分析)"""
if not hot_data:
logging.warning("无热榜数据可保存")
return
# 去重(按新闻ID去重,避免重复抓取)
df = pd.DataFrame(hot_data).drop_duplicates(subset=["新闻ID"], keep="last")
# 筛选常用字段,优化Excel可读性
columns = [
"热榜排名", "热榜类型", "热度指数", "标题", "核心关键词", "话题分类",
"发布时间", "上榜时间", "来源媒体", "阅读量", "评论量", "转发量",
"详情页URL", "是否原创", "是否置顶"
]
df = df[columns]
# 保存Excel(支持增量写入,避免覆盖历史数据)
try:
# 读取历史数据
history_df = pd.read_excel(save_path, engine="openpyxl")
# 合并数据并去重
df = pd.concat([history_df, df]).drop_duplicates(subset=["标题", "发布时间"], keep="last")
except FileNotFoundError:
# 无历史数据,直接保存
pass
df.to_excel(save_path, index=False, engine="openpyxl")
logging.info(f"热榜数据已保存至:{save_path}(共{len(df)}条记录)")
def realtime_crawl_hot_data():
"""实时抓取热榜数据(定时任务)"""
logging.info("="*50)
logging.info("开始实时抓取澎湃网热榜数据...")
# 配置实时抓取条件(可根据业务调整)
SEARCH_KEYWORDS = "全站热榜 时政 科技"
HOT_TYPE = "all,politics,tech"
TIME_RANGE = "today"
SORT_TYPE = "hot"
# 批量获取数据
hot_data = batch_get_hot_list(
keywords=SEARCH_KEYWORDS,
hot_type=HOT_TYPE,
time_range=TIME_RANGE,
sort_type=SORT_TYPE
)
# 保存数据
if hot_data:
save_hot_data(hot_data)
# 打印TOP10热榜(日志输出)
top10_hot = hot_data[:10]
logging.info("今日TOP10热榜:")
for i, item in enumerate(top10_hot, 1):
logging.info(f"{i}. 【{item['热榜类型']}】{item['标题']} - 热度指数:{item['热度指数']} - 阅读量:{item['阅读量']}")
else:
logging.warning("未获取到实时热榜数据")
logging.info("本次实时抓取完成")
logging.info("="*50 + "\n")
# 调用示例(支持两种模式:一次性批量抓取/实时定时抓取)
if __name__ == "__main__":
# 模式1:一次性批量抓取(如采集近7天时政热榜数据)
# batch_hot_data = batch_get_hot_list(
# keywords="时政热榜",
# hot_type="politics",
# time_range="7days",
# sort_type="hot"
# )
# save_hot_data(batch_hot_data, "澎湃网近7天时政热榜.xlsx")
# 模式2:实时定时抓取(每5分钟执行一次)
scheduler = BlockingScheduler()
# 添加定时任务(interval:间隔执行)
scheduler.add_job(realtime_crawl_hot_data, 'interval', seconds=REALTIME_INTERVAL)
logging.info(f"实时抓取任务已启动,间隔{REALTIME_INTERVAL}秒执行一次...")
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
logging.info("实时抓取任务停止")