快手作为国内领先的短视频和直播平台,其开放平台(快手开放平台)提供了丰富的 API 接口,覆盖内容管理、用户互动、直播运营、商业化等场景,主要服务于企业开发者、MCN 机构和内容创作者。以下从接口体系、认证机制、核心功能展开分析,并提供 Python 调用实现(以用户信息和视频列表接口为例)。
一、快手 API 核心特性分析
1. 接口体系与功能域
2. 认证机制(OAuth 2.0)
3. 核心接口参数与响应示例
二、Python 脚本实现
import requests
import json
import time
import logging
from typing import Dict, Optional, List
from requests.exceptions import RequestException
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
class KuaishouAPI:
def __init__(self, client_id: str, client_secret: str, redirect_uri: str):
"""
初始化快手API客户端
:param client_id: 应用client_id(快手开放平台获取)
:param client_secret: 应用client_secret
:param redirect_uri: 授权回调地址(需与开放平台配置一致)
"""
self.client_id = client_id
self.client_secret = client_secret
self.redirect_uri = redirect_uri
self.base_url = "https://open-api.kuaishou.com"
self.access_token = None
self.refresh_token = None
self.expires_at = 0 # token过期时间(时间戳)
self.session = requests.Session()
def get_authorization_url(self, scope: str = "user_info,video.list") -> str:
"""
生成用户授权URL(引导用户授权)
:param scope: 授权范围(多个用逗号分隔)
:return: 授权URL
"""
params = {
"client_id": self.client_id,
"response_type": "code",
"redirect_uri": self.redirect_uri,
"scope": scope,
"state": f"ks_{int(time.time())}" # 随机state,用于防CSRF
}
return f"{self.base_url}/oauth2/authorize?{requests.compat.urlencode(params)}"
def get_access_token(self, code: str) -> Optional[Dict]:
"""
通过授权码获取access_token
:param code: 授权回调返回的code
:return: 令牌信息(含access_token、refresh_token等)
"""
url = f"{self.base_url}/oauth2/access_token"
data = {
"client_id": self.client_id,
"client_secret": self.client_secret,
"grant_type": "authorization_code",
"code": code,
"redirect_uri": self.redirect_uri
}
try:
response = self.session.post(url, data=data, timeout=10)
response.raise_for_status()
result = response.json()
# 处理错误响应
if "error" in result:
logging.error(f"获取token失败:{result['error_description']}(错误码:{result['error']})")
return None
# 保存token信息
self.access_token = result["access_token"]
self.refresh_token = result["refresh_token"]
self.expires_at = time.time() + result["expires_in"] # 计算过期时间
logging.info("access_token获取成功")
return result
except RequestException as e:
logging.error(f"请求失败:{str(e)}")
return None
def refresh_access_token(self) -> Optional[Dict]:
"""刷新access_token(当token过期时)"""
if not self.refresh_token:
logging.error("无refresh_token,无法刷新")
return None
url = f"{self.base_url}/oauth2/access_token"
data = {
"client_id": self.client_id,
"client_secret": self.client_secret,
"grant_type": "refresh_token",
"refresh_token": self.refresh_token
}
try:
response = self.session.post(url, data=data, timeout=10)
result = response.json()
if "error" in result:
logging.error(f"刷新token失败:{result['error_description']}")
return None
# 更新token信息
self.access_token = result["access_token"]
self.refresh_token = result["refresh_token"]
self.expires_at = time.time() + result["expires_in"]
logging.info("access_token刷新成功")
return result
except Exception as e:
logging.error(f"刷新请求失败:{str(e)}")
return None
def _ensure_token_valid(self) -> bool:
"""确保access_token有效(过期则刷新)"""
if not self.access_token:
logging.error("未获取access_token,请先调用get_access_token")
return False
# 检查是否即将过期(提前30秒刷新)
if time.time() + 30 >= self.expires_at:
logging.info("access_token即将过期,尝试刷新")
return self.refresh_access_token() is not None
return True
def call_api(self, path: str, method: str = "GET", params: Dict = None) -> Optional[Dict]:
"""
通用API调用方法
:param path: 接口路径(如/oauth2/userinfo)
:param method: 请求方法(GET/POST)
:param params: 请求参数
:return: 接口响应数据
"""
# 确保token有效
if not self._ensure_token_valid():
return None
# 构建请求头(携带token)
headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json"
}
url = f"{self.base_url}{path}"
try:
if method.upper() == "GET":
response = self.session.get(url, params=params, headers=headers, timeout=10)
else:
response = self.session.post(url, json=params, headers=headers, timeout=10)
response.raise_for_status()
result = response.json()
# 快手API错误码在根节点(0为成功)
if result.get("error_code") != 0:
logging.error(f"接口错误:{result.get('error_msg')}(错误码:{result.get('error_code')})")
return None
return result
except RequestException as e:
logging.error(f"接口请求失败:{str(e)},路径:{path}")
return None
except json.JSONDecodeError:
logging.error(f"响应解析失败:{response.text[:200]}...")
return None
def get_user_info(self) -> Optional[Dict]:
"""获取当前授权用户的基本信息"""
return self.call_api("/oauth2/userinfo")
def get_video_list(self, user_id: str, count: int = 10, cursor: str = "") -> Optional[List[Dict]]:
"""
获取用户发布的视频列表
:param user_id: 用户ID(可从get_user_info获取)
:param count: 每页数量(最大30)
:param cursor: 分页游标(第一页为空,后续用返回的next_cursor)
:return: 视频列表
"""
params = {
"user_id": user_id,
"count": count,
"cursor": cursor
}
result = self.call_api("/video/list", params=params)
if not result:
return None
# 解析视频列表
return {
"videos": result.get("items", []),
"next_cursor": result.get("cursor"), # 下一页游标
"has_more": result.get("has_more", False)
}
# 示例调用
if __name__ == "__main__":
# 替换为实际参数(从快手开放平台获取)
CLIENT_ID = "your_client_id"
CLIENT_SECRET = "your_client_secret"
REDIRECT_URI = "https://your-redirect-uri.com/callback" # 需与开放平台配置一致
# 初始化客户端
ks_api = KuaishouAPI(CLIENT_ID, CLIENT_SECRET, REDIRECT_URI)
# 1. 生成授权URL(引导用户在浏览器中打开并授权)
auth_url = ks_api.get_authorization_url(scope="user_info,video.list")
print(f"请在浏览器中打开以下链接授权:\n{auth_url}")
# 2. 输入授权后回调返回的code(用户授权后,回调URL会携带code参数)
code = input("请输入授权回调返回的code:").strip()
if not code:
print("code不能为空")
exit(1)
# 3. 获取access_token
token_result = ks_api.get_access_token(code)
if not token_result:
print("获取token失败")
exit(1)
# 4. 获取用户信息
user_info = ks_api.get_user_info()
if user_info:
print("\n用户信息:")
print(f"用户ID:{user_info.get('open_id')}")
print(f"昵称:{user_info.get('nickname')}")
print(f"头像:{user_info.get('avatar')}")
print(f"性别:{'男' if user_info.get('gender') == 1 else '女' if user_info.get('gender') == 2 else '未知'}")
# 5. 获取用户发布的视频列表
if user_info:
user_id = user_info.get('open_id')
video_list = ks_api.get_video_list(user_id, count=5)
if video_list and video_list["videos"]:
print("\n视频列表(前5条):")
for i, video in enumerate(video_list["videos"], 1):
print(f"{i}. 标题:{video.get('title')}")
print(f" 视频ID:{video.get('video_id')}")
print(f" 播放量:{video.get('play_count')} | 时长:{video.get('duration')}秒")
print(f" 发布时间:{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(video.get('create_time')))}")
print("-" * 60)