0

0

flask celery 使用方法

看不見的法師

看不見的法師

发布时间:2025-09-13 08:55:01

|

969人浏览过

|

来源于php中文网

原创

安装celery在windows上的步骤和注意事项如下:

由于Celery 4.0版本不支持Windows操作系统,如果在Windows上安装Celery 4.0,会出现以下错误:

flask celery 使用方法flask_clery

因此,你只能安装Celery 3.1版本:

pip install celery==3.1

接下来,安装py for redis模块:

pip install redis

安装Redis服务时需要注意,许多在线文章对系统环境描述不清,导致误导。Redis官方支持Linux,但不支持Windows。要在Windows上使用Redis服务,请从以下地址下载Redis安装包并完成安装:

https://www.php.cn/link/8ee3592d7d251e3f7fe4da469785592b

如果未安装Redis包,将会出现以下错误:

redis.exceptions.ConnectionError: Error 10061 connecting to localhost:6379.

redis.exceptions.ConnectionError

注意:安装目录不要选择C盘,否则可能会遇到权限依赖问题。

添加Redis环境变量:

D:\\Program Files\\Redis

初始化Redis服务:

进入Redis安装目录,打开cmd并运行命令:

redis-server.exe redis.windows.conf

如果出现错误,可以通过双击目录下的

redis-cli.exe
并在新窗口中输入
shutdown
exit
来解决。

在Flask中集成Celery时,需要在Flask配置中添加以下配置:

# Celery 配置
CELERY_BROKER_URL = 'redis://localhost:6379/0' # broker是一个消息传输的中间件
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1' # 任务执行器

在Flask工程的

__init__
目录下创建Celery实例,注意以下代码必须在Flask app读取完配置文件后编写:

def make_celery(app):
    celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'],
                    backend=app.config['CELERY_RESULT_BACKEND'])
    celery.conf.update(app.config)
    TaskBase = celery.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery
celery = make_celery(app)

完整的示例代码如下:

app = Flask(__name__)
app.config.from_object(config['default'])
def make_celery(app):
    celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'],
                    backend=app.config['CELERY_RESULT_BACKEND'])
    celery.conf.update(app.config)
    TaskBase = celery.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery
celery = make_celery(app)

常用的配置文件示例如下:

# 注意,celery4版本后,CELERY_BROKER_URL改为BROKER_URL
BROKER_URL = 'amqp://username:passwd@host:port/虚拟主机名'
# 指定结果的接受地址
CELERY_RESULT_BACKEND = 'redis://username:passwd@host:port/db'
# 指定任务序列化方式
CELERY_TASK_SERIALIZER = 'msgpack'
# 指定结果序列化方式
CELERY_RESULT_SERIALIZER = 'msgpack'
# 任务过期时间,celery任务执行结果的超时时间
CELERY_TASK_RESULT_EXPIRES = 60 * 20
# 指定任务接受的序列化类型.
CELERY_ACCEPT_CONTENT = ["msgpack"]
# 任务发送完成是否需要确认,这一项对性能有一点影响
CELERY_ACKS_LATE = True
# 压缩方案选择,可以是zlib, bzip2,默认是发送没有压缩的数据
CELERY_MESSAGE_COMPRESSION = 'zlib'
# 规定完成任务的时间
CELERYD_TASK_TIME_LIMIT = 5
# 在5s内完成任务,否则执行该任务的worker将被杀死,任务移交给父进程
# celery worker的并发数,默认是服务器的内核数目,也是命令行-c参数指定的数目
CELERYD_CONCURRENCY = 4
# celery worker 每次去rabbitmq预取任务的数量
CELERYD_PREFETCH_MULTIPLIER = 4
# 每个worker执行了多少任务就会死掉,默认是无限的
CELERYD_MAX_TASKS_PER_CHILD = 40
# 设置默认的队列名称,如果一个消息不符合其他的队列就会放在默认队列里面,如果什么都不设置的话,数据都会发送到默认的队列中
CELERY_DEFAULT_QUEUE = "default"
# 设置详细的队列
CELERY_QUEUES = {
    "default": { # 这是上面指定的默认队列
        "exchange": "default",
        "exchange_type": "direct",
        "routing_key": "default"
    },
    "topicqueue": { # 这是一个topic队列 凡是topictest开头的routing key都会被放到这个队列
        "routing_key": "topic.#",
        "exchange": "topic_exchange",
        "exchange_type": "topic",
    },
    "task_eeg": { # 设置扇形交换机
        "exchange": "tasks",
        "exchange_type": "fanout",
        "binding_key": "tasks",
    },
}

在cmd中启动Celery服务:

celery -A your_application.celery worker --loglevel=info

其中,

your_application
为你的工程名称,在这里为
get_tieba_film

调用Celery任务的示例代码如下:

@app.route('/')
@app.route('/index')
def index():
    print("耗时的任务")
    # 任务已经交给异步处理了
    result = get_film_content.apply_async(args=[1])
    # 如果需要等待返回值,可以使用get()或wait()方法
    # result.wait()
    return '耗时的任务已经交给了celery'

@celery.task() def get_film_content(a): util = SpiderRunUtil.SpiderRun(TieBaSpider.FilmSpider()) util.start()

绑定任务的示例:

@task(bind=True)
def add(self, x, y):
logger.info(self.request.id)

任务继承的示例:

import celery
class MyTask(celery.Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
print('{0!r} failed: {1!r}'.format(task_id, exc))

@task(base=MyTask) def add(x, y): raise KeyError()

任务名称的设置:

每个任务必须有不同的名称。如果没有显示提供名称,任务装饰器将会自动产生一个,产生的名称会基于这些信息:1)任务定义所在的模块,2)任务函数的名称。

显示设置任务名称的例子:

>>> @app.task(name='sum-of-two-numbers')
>>> def add(x, y):
...     return x + y
>>> add.name
'sum-of-two-numbers'

最佳实践是使用模块名称作为命名空间,这样的话如果有一个同名任务函数定义在其他模块也不会产生冲突。

>>> @app.task(name='tasks.add')
>>> def add(x, y):
...     return x + y

安装Flower来监控任务和worker的状态:

起航点卡销售系统
起航点卡销售系统

欢迎使用“起航点卡销售系统”销售程序:一、系统优势 1、售卡系统采取了会员与非会员相结合的销售方法,客户无需注册即可购卡,亦可注册会员购卡。 2、购卡速度快,整个购卡或过程只需二步即可取卡,让客户感受超快的取卡方式! 3、批量加卡功能。 4、取卡方式:网上支付,即时取卡 ,30秒可完成交易。 5、加密方式:MD5 32位不可倒推加密 6、防止跨站

下载
pip install flower

启动Flower(默认会启动一个webserver,端口为5555):

celery flower --address=127.0.0.1 --port=5555

进入

https://www.php.cn/link/070ffad3c0f12da66ca3b5d0c2d23069
即可查看。

常见错误及其解决方案:

ERROR/MainProcess] consumer: Cannot connect to redis://localhost:6379/0:

原因是:Redis-server没有启动。

解决方案:到Redis安装目录下执行

redis-server.exe redis.windows.conf

检查Redis是否启动:

redis-cli ping

line 442, in on_task_received

解决:

Did you remember to import the module containing this task?Or maybe you are using relative imports?Please see https://www.php.cn/link/dc14e10c0ecf8029a86d27e74d140539 for more information.The full contents of the message body was:{'timelimit': (None, None), 'utc': True, 'chord': None, 'args': [4, 4], 'retries': 0, 'expires': None, 'task': 'main.add', 'callbacks': None,'errbacks': None, 'taskset': None, 'kwargs': {}, 'eta': None, 'id': '97000322-93be-47e9-a082-4620e123dc5e'} (210b)Traceback (most recent call last):  File "d:\vm_env\flask_card\lib\site-packages\celery\worker\consumer.py", line 442, in on_task_received    strategies[name](message, body,KeyError: 'main.add'

原因:任务没有注册或注册不成功,只有在启动的时候提示有任务的时候,才能使用该任务。

flask celery 使用方法flask_celery

解决:

你在那个类中使用Celery就在哪个类中执行

celery -A 包名.类名.celery worker -l info
。根据上一部提示的任务列表给任务设置对应的名称,如在Test中:

from main import app, celery
@celery.task(name="main.Test.add")
def add(x, y):
print "ddddsws"
return x + y

目录结构:

+ Card  # 工程

  • main
    • admin
      • Task.py
    • init.py
    • Test.py

则应该启动的命令为:

celery -A main.Test.celery worker -l info

同时,如果你的Task.py也有任务,那么你还应该重新创建一个cmd窗口执行:

celery -A main.admin.Task.celery worker -l info

Celery的工作进程可以创建多个。

flask celery 使用方法flask_celery

flask celery 使用方法flask_celery

参考:

https://www.php.cn/link/d61f3a760c9bcbc9bb75228deddd9379

https://www.php.cn/link/381dc6cd0e6bfa5feb1f70484171a7a9

Celery用户指南,强烈推荐看Redis安装Celery使用https://redis.io/topics/quickstart

https://www.php.cn/link/c031d32c88833d1f9a2144071eaf34d9 最佳实践

https://www.php.cn/link/f6c744ece7e1a36892eba3a5d2938110

https://www.php.cn/link/337751565e513506b6400ca2ad6ff5df

https://www.php.cn/link/a2667dd894062c9ca2a4602cb4718f52 Celery 和 redis 完成任务队列

相关专题

更多
Python Flask框架
Python Flask框架

本专题专注于 Python 轻量级 Web 框架 Flask 的学习与实战,内容涵盖路由与视图、模板渲染、表单处理、数据库集成、用户认证以及RESTful API 开发。通过博客系统、任务管理工具与微服务接口等项目实战,帮助学员掌握 Flask 在快速构建小型到中型 Web 应用中的核心技能。

85

2025.08.25

Python Flask Web框架与API开发
Python Flask Web框架与API开发

本专题系统介绍 Python Flask Web框架的基础与进阶应用,包括Flask路由、请求与响应、模板渲染、表单处理、安全性加固、数据库集成(SQLAlchemy)、以及使用Flask构建 RESTful API 服务。通过多个实战项目,帮助学习者掌握使用 Flask 开发高效、可扩展的 Web 应用与 API。

71

2025.12.15

windows查看端口占用情况
windows查看端口占用情况

Windows端口可以认为是计算机与外界通讯交流的出入口。逻辑意义上的端口一般是指TCP/IP协议中的端口,端口号的范围从0到65535,比如用于浏览网页服务的80端口,用于FTP服务的21端口等等。怎么查看windows端口占用情况呢?php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

600

2023.07.26

查看端口占用情况windows
查看端口占用情况windows

端口占用是指与端口关联的软件占用端口而使得其他应用程序无法使用这些端口,端口占用问题是计算机系统编程领域的一个常见问题,端口占用的根本原因可能是操作系统的一些错误,服务器也可能会出现端口占用问题。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

1104

2023.07.27

windows照片无法显示
windows照片无法显示

当我们尝试打开一张图片时,可能会出现一个错误提示,提示说"Windows照片查看器无法显示此图片,因为计算机上的可用内存不足",本专题为大家提供windows照片无法显示相关的文章,帮助大家解决该问题。

792

2023.08.01

windows查看端口被占用的情况
windows查看端口被占用的情况

windows查看端口被占用的情况的方法:1、使用Windows自带的资源监视器;2、使用命令提示符查看端口信息;3、使用任务管理器查看占用端口的进程。本专题为大家提供windows查看端口被占用的情况的相关的文章、下载、课程内容,供大家免费下载体验。

452

2023.08.02

windows无法访问共享电脑
windows无法访问共享电脑

在现代社会中,共享电脑是办公室和家庭的重要组成部分。然而,有时我们可能会遇到Windows无法访问共享电脑的问题。这个问题可能会导致数据无法共享,影响工作和生活的正常进行。php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

2349

2023.08.08

windows自动更新
windows自动更新

Windows操作系统的自动更新功能可以确保系统及时获取最新的补丁和安全更新,以提高系统的稳定性和安全性。然而,有时候我们可能希望暂时或永久地关闭Windows的自动更新功能。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

780

2023.08.10

PHP WebSocket 实时通信开发
PHP WebSocket 实时通信开发

本专题系统讲解 PHP 在实时通信与长连接场景中的应用实践,涵盖 WebSocket 协议原理、服务端连接管理、消息推送机制、心跳检测、断线重连以及与前端的实时交互实现。通过聊天系统、实时通知等案例,帮助开发者掌握 使用 PHP 构建实时通信与推送服务的完整开发流程,适用于即时消息与高互动性应用场景。

11

2026.01.19

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
PostgreSQL 教程
PostgreSQL 教程

共48课时 | 7.4万人学习

Git 教程
Git 教程

共21课时 | 2.8万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号