如何使用 Python 调用小红书笔记评论 API 时进行并发控制?

张开发
2026/7/1 14:11:43 15 分钟阅读
如何使用 Python 调用小红书笔记评论 API 时进行并发控制?
我给你最简单、最实用、直接能运行的并发控制方案专门适配小红书 API防止限流 / 429 / 封号。一、小红书 API 并发规则必须知道官方限制QPS ≤ 5每秒最多 5 次请求安全阈值1 秒 1 次最稳并发超限返回429限流、封禁 IP、token 失效并发控制核心控制请求频率 队列排队 延时等待二、最简单并发控制方案直接复制方法 1延时控制最稳、新手首选每次请求强制等待 1 秒永不触发限流python运行import time import requests def get_comment(note_id, cursor): url fhttps://api.xiaohongshu.com/v2/notes/{note_id}/comments headers {Authorization: Bearer YOUR_TOKEN} params {cursor: cursor, page_size: 20} res requests.get(url, headersheaders, paramsparams) time.sleep(1) # 强制延时1s → 并发安全 return res.json()方法 2线程池并发高级、速度快适合批量爬取多篇笔记评论python运行from concurrent.futures import ThreadPoolExecutor import time import requests # 全局控制每秒最多允许2个请求安全值 RATE_LIMIT 2 last_request_time [] def safe_get(note_id, cursor): # 并发控制核心 global last_request_time while len(last_request_time) RATE_LIMIT: now time.time() last_request_time [t for t in last_request_time if now - t 1] if last_request_time: time.sleep(0.2) last_request_time.append(time.time()) # 正常请求 headers {Authorization: Bearer YOUR_TOKEN} url fhttps://api.xiaohongshu.com/v2/notes/{note_id}/comments res requests.get(url, headersheaders, params{cursor: cursor, page_size: 20}) return res.json() # 多线程并发最多2线程 with ThreadPoolExecutor(max_workers2) as executor: executor.map(safe_get, [笔记1, 笔记2, 笔记3])方法 3令牌桶算法企业级最标准的 API 并发控制自动限速python运行import time class TokenBucket: def __init__(self, rate, capacity): self.rate rate # 每秒生成token数 self.capacity capacity self.tokens capacity self.last_time time.time() def take(self): now time.time() add (now - self.last_time) * self.rate self.tokens min(self.capacity, self.tokens add) self.last_time now while self.tokens 1: time.sleep(0.1) now time.time() add (now - self.last_time) * self.rate self.tokens min(self.capacity, self.tokens add) self.last_time now self.tokens - 1 # 使用每秒最多2个请求 bucket TokenBucket(rate2, capacity2) def api_call(): bucket.take() # 限流 # 发起API请求...三、并发控制最佳实践直接照做每秒不超过 2 次请求批量任务用 2 个线程以内出现 429 错误立即停止 3~5 秒不要使用多进程高并发cursor 翻页必须串行不能并发四、429 限流自动重试代码必备python运行def get_with_retry(note_id, cursor, retry3): for i in range(retry): res safe_get(note_id, cursor) if res.get(code) 429: print(触发限流等待5秒...) time.sleep(5) continue return res return None

更多文章