0

0

Python爬虫请求调度设计_任务队列实现思路【教程】

冷漠man

冷漠man

发布时间:2025-12-25 21:44:02

|

483人浏览过

|

来源于php中文网

原创

直接用 queue.queue 易卡死,因其 get() 默认无限阻塞且无超时/异常穿透机制;asyncio.queue 需配 timeout 和 task_done;redis 用 zset + bzpopmin 支持优先级与持久化;须通过 full() 或 zcard 实现反压控制。

python爬虫请求调度设计_任务队列实现思路【教程】

为什么直接用 queue.Queue爬虫里容易卡死

多线程爬虫中,如果直接用标准库queue.Queue 做任务分发,常出现消费者线程全部阻塞在 get()、生产者却因异常退出而不再放新任务——队列既没满也没空,但整个调度就僵住了。根本原因是它默认的阻塞行为缺乏超时兜底和异常穿透机制。

  • get(block=True) 会无限等待,一旦上游断流,线程就挂起不响应中断
  • 没有内置重试计数或失败归档逻辑,单个坏 URL 可能导致任务永久滞留
  • 无法跨进程共享,后续想加分布式调度就得重写整套队列层

asyncio.Queue 实现轻量异步调度的关键配置

对中小规模 HTTP 爬取(比如每秒 10–50 请求),asyncio.Queue 比线程队列更省资源,但必须显式控制生命周期,否则协程会泄漏。

import asyncio
<p>async def worker(queue: asyncio.Queue, session):
while True:
try:
url = await asyncio.wait_for(queue.get(), timeout=3.0)  # 必须设超时
async with session.get(url) as resp:</p><h1>处理响应...</h1><pre class='brush:python;toolbar:false;'>        queue.task_done()  # 必须调用,否则 join() 不返回
    except asyncio.TimeoutError:
        break  # 超时即退出,避免死循环
    except Exception as e:
        print(f"Worker error on {url}: {e}")
        queue.task_done()  # 错误也要标记完成,否则队列卡住
  • asyncio.wait_for(..., timeout=...) 是刚需,不能依赖 get_nowait() —— 它抛 queue.Empty 异常,但协程里没地方 catch
  • 每个 get() 后必须配对 task_done(),哪怕出错也要调,否则 queue.join() 永远不结束
  • 不要在 worker 里用 await queue.put(...) 回填重试任务——容易引发循环等待,应由独立的 retry manager 处理

需要持久化或扩缩容?绕过内存队列直连 Redis 的最小可行方案

当爬虫要跑几天、或需横向加机器时,内存队列不可靠。用 redis-pylpop/rpush 组合比引入 Celery 更轻,且天然支持失败重入队。

import redis
import json
<p>r = redis.Redis()</p><div class="aritcle_card flexRow">
                                                        <div class="artcardd flexRow">
                                                                <a class="aritcle_card_img" href="/ai/2158" title="拍我AI"><img
                                                                                src="https://img.php.cn/upload/ai_manual/000/000/000/175680310874179.png" alt="拍我AI"  onerror="this.onerror='';this.src='/static/lhimages/moren/morentu.png'" ></a>
                                                                <div class="aritcle_card_info flexColumn">
                                                                        <a href="/ai/2158" title="拍我AI">拍我AI</a>
                                                                        <p>AI视频生成平台PixVerse的国内版本</p>
                                                                </div>
                                                                <a href="/ai/2158" title="拍我AI" class="aritcle_card_btn flexRow flexcenter"><b></b><span>下载</span> </a>
                                                        </div>
                                                </div><p><span>立即学习</span>“<a href="https://pan.quark.cn/s/00968c3c2c15" style="text-decoration: underline !important; color: blue; font-weight: bolder;" rel="nofollow" target="_blank">Python免费学习笔记(深入)</a>”;</p><p>def add_task(url: str, priority: int = 0):
payload = json.dumps({"url": url, "retry": 0})
r.zadd("pending_tasks", {payload: priority})  # 用有序集合支持优先级</p><p>def get_task(timeout=1) -> dict | None:</p><h1>阻塞式取一个,超时返回 None</h1><pre class='brush:python;toolbar:false;'>result = r.bzpopmin("pending_tasks", timeout=timeout)
if result:
    return json.loads(result[1])
return None
  • 别用 list 类型的 lpop —— 无法去重、不支持优先级、无超时原语;zsetstream 更稳妥
  • bzpopmin 是原子操作,避免“取到但崩溃未处理”导致任务丢失
  • 任务体里必须带 retry 字段,失败时 r.zadd("pending_tasks", {payload: time.time() + 60}) 实现指数退避

调度器里最容易被忽略的「反压」信号:如何让生产者感知下游拥堵

很多爬虫把 URL 批量塞进队列就不管了,结果内存暴涨 OOM。真正的调度必须让生产者知道“慢点来”。

  • queue.qsize() 做阈值判断不可靠(多线程下非原子),改用 queue.full() + time.sleep() 组合
  • 异步场景下,在 put() 前加 if queue.qsize() > MAX_SIZE: await asyncio.sleep(0.1)
  • Redis 方案中,用 r.zcard("pending_tasks") 监控积压量,超过阈值则暂停解析新页面链接

队列不是管道,是缓冲区;缓冲区满了还硬塞,系统就从调度问题变成运维事故。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
什么是分布式
什么是分布式

分布式是一种计算和数据处理的方式,将计算任务或数据分散到多个计算机或节点中进行处理。本专题为大家提供分布式相关的文章、下载、课程内容,供大家免费下载体验。

406

2023.08.11

分布式和微服务的区别
分布式和微服务的区别

分布式和微服务的区别在定义和概念、设计思想、粒度和复杂性、服务边界和自治性、技术栈和部署方式等。本专题为大家提供分布式和微服务相关的文章、下载、课程内容,供大家免费下载体验。

251

2023.10.07

if什么意思
if什么意思

if的意思是“如果”的条件。它是一个用于引导条件语句的关键词,用于根据特定条件的真假情况来执行不同的代码块。本专题提供if什么意思的相关文章,供大家免费阅读。

846

2023.08.22

线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

765

2023.08.10

Python 多线程与异步编程实战
Python 多线程与异步编程实战

本专题系统讲解 Python 多线程与异步编程的核心概念与实战技巧,包括 threading 模块基础、线程同步机制、GIL 原理、asyncio 异步任务管理、协程与事件循环、任务调度与异常处理。通过实战示例,帮助学习者掌握 如何构建高性能、多任务并发的 Python 应用。

377

2025.12.24

java多线程相关教程合集
java多线程相关教程合集

本专题整合了java多线程相关教程,阅读专题下面的文章了解更多详细内容。

31

2026.01.21

C++多线程相关合集
C++多线程相关合集

本专题整合了C++多线程相关教程,阅读专题下面的的文章了解更多详细内容。

29

2026.01.21

C# 多线程与异步编程
C# 多线程与异步编程

本专题深入讲解 C# 中多线程与异步编程的核心概念与实战技巧,包括线程池管理、Task 类的使用、async/await 异步编程模式、并发控制与线程同步、死锁与竞态条件的解决方案。通过实际项目,帮助开发者掌握 如何在 C# 中构建高并发、低延迟的异步系统,提升应用性能和响应速度。

103

2026.02.06

C# ASP.NET Core微服务架构与API网关实践
C# ASP.NET Core微服务架构与API网关实践

本专题围绕 C# 在现代后端架构中的微服务实践展开,系统讲解基于 ASP.NET Core 构建可扩展服务体系的核心方法。内容涵盖服务拆分策略、RESTful API 设计、服务间通信、API 网关统一入口管理以及服务治理机制。通过真实项目案例,帮助开发者掌握构建高可用微服务系统的关键技术,提高系统的可扩展性与维护效率。

3

2026.03.11

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
最新Python教程 从入门到精通
最新Python教程 从入门到精通

共4课时 | 22.5万人学习

Django 教程
Django 教程

共28课时 | 4.9万人学习

SciPy 教程
SciPy 教程

共10课时 | 1.9万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号