0

0

怎么用Python Celery动态添加定时任务

王林

王林

发布时间:2023-05-13 15:43:06

|

2352人浏览过

|

来源于亿速云

转载

    一、背景

    实际工作中会有一些耗时的异步任务需要使用定时调度,比如发送邮件,拉取数据,执行定时脚本

    通过celery 实现调度主要思想是 通过引入中间人redis,启动 worker 进行任务执行 ,celery-beat进行定时任务数据存储

    二、Celery动态添加定时任务的官方文档

    celery文档:https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#beat-custom-schedulers

    celery 自定义调度类说明: 

    自定义调度器类可以在命令行中指定(--scheduler参数)

    立即学习Python免费学习笔记(深入)”;

    django-celery-beat文档 : https://pypi.org/project/django-celery-beat/

    关于django-celery-beat 插件的说明: 

    此扩展使您能够将定期任务计划存储在数据库中,可以从 Django 管理界面管理周期性任务,您可以在其中创建、编辑和删除周期性任务以及它们应该运行的频率

    三、celery简单实用

    3.1 基础环境配置

    1. 安装最新版本的Django

    pip3 install django #当前我安装的版本是 3.0.6

    2. 创建项目

    django-admin startproject typeidea
    django-admin startapp blog

    3.安装 celery

    pip3 install django-celery
    pip3 install -U Celery 
    pip3 install "celery[librabbitmq,redis,auth,msgpack]" 
    pip3 install django-celery-beat # 用于动态添加定时任务
    pip3 install django-celery-results
    pip3 install redis

    3.2 测试使用Celery应用

    1. 创建blog目录、新建task.py

    首先在Django项目中创建一个blog文件夹,并且在blog文件夹下创建tasks.py模块, 如下:

    怎么用Python Celery动态添加定时任务

     tasks.py代码如下: 

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
     
    """
    #File: tasks.py
    #Time: 2022/3/30 2:26 下午
    #Author: julius
    """
    from celery import Celery
     
    # 使用redis做为broker
    app = Celery('blog.tasks2',broker='redis://127.0.0.1:6379/0')
     
    # 创建任务函数
    @app.task
    def my_task():
        print('任务正在执行...')

    Celery第一个参数是给其设定一个名字, 第二参数我们设定一个中间人broker, 在这里我们使用Redis作为中间人。my_task函数是我们编写的一个任务函数, 通过加上装饰器app.task, 将其注册到broker的队列中。

    2. 启动redis、创建worker

    现在我们在创建一个worker, 等待处理队列中的任务。

    进入项目的根目录,执行命令: celery -A celery_tasks.tasks worker -l info

    怎么用Python Celery动态添加定时任务

     3. 调用任务

    下面来测试一下功能,创建一个任务,加入任务队列中,提供worker执行。

    进入python终端, 执行如下代码:

    $ python manage.py shell
    >>> from blog.tasks import my_task
    >>> my_task.delay()
    <AsyncResult: 83484dfe-f729-417b-8e51-6c7ae32a1377>

    调用一个任务函数,将会返回一个AsyncResult对象,这个对象可以用来检查任务的状态或者获得任务的返回值。

    4. 查看结果

    在worker的终端查看任务执行情况,可以看到已经收到83484dfe-f729-417b-8e51-6c7ae32a1377 任务,并打印了任务执行信息

    怎么用Python Celery动态添加定时任务

    5. 存储并查看任务执行状态

    把任务执行结果赋值给ret,然后调用result() 会产生 DisabledBackend 报错,可见没有配置后端存储的时候并不能保存任务执行的状态信息,下一节我们会讲到如何配置backend保存任务执行结果

    $ python manage.py shell
    >>> from blog.tasks import my_task
    >>> ret=my_task.delay()
    >>> ret.result()

    怎么用Python Celery动态添加定时任务

    四、配置backend存储任务执行结果 

    如果我们想跟踪任务的状态,Celery需要将结果保存到某个地方。有几种保存的方案可选:SQLAlchemy、Django ORM、Memcached、 Redis、RPC (RabbitMQ/AMQP)。

    1. 添加backend参数

    在本例中我们使用Redis作为存储结果的方案,通过Celery的backend参数来设定任务结果存储地址。我们将tasks模块修改如下:

    from celery import Celery
     
    # 使用redis作为broker以及backend
    app = Celery('celery_tasks.tasks',
                 broker='redis://127.0.0.1:6379/8',
                 backend='redis://127.0.0.1:6379/9')
     
    # 创建任务函数
    @app.task
    def my_task(a, b):
        print("任务函数正在执行....")
        return a + b

    给Celery增加了backend参数,指定redis作为结果存储,并将任务函数修改为两个参数,并且有返回值。

    2. 调用任务/查看任务执行结果

    下面再来执行调用一下这个任务看看。

    $ python manage.py shell
    >>> from blog.tasks import my_task
    >>> res=my_task.delay(10,40)
    >>> res.result
    50
    >>> res.failed()
    False

    再来看看worker的执行情况,如下:

    怎么用Python Celery动态添加定时任务

    可以看到celery任务已经执行成功了。

    但是这只是一个开始,下一步要看看如何添加定时的任务。

    四、优化Celery目录结构

    上面直接将Celery的应用创建、配置、tasks任务全部写在了一个文件,这样在后面项目越来越大,也是不方便的。下面来拆分一下,并且添加一些常用的参数。

    基本结构如下

    怎么用Python Celery动态添加定时任务

    $ vim typeidea/celery.py (Celery应用文件)

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
     
    """
    #File: celery.py
    #Time: 2022/3/30 12:25 下午
    #Author: julius
    """
    import os
    from celery import Celery
    from blog import celeryconfig
    project_name='typeidea'
    # set the default django setting module for the 'celery' program
    os.environ.setdefault('DJANGO_SETTINGS_MODULE','typeidea.settings')
    app = Celery(project_name)
     
    app.config_from_object('django.conf:settings')
     
    app.autodiscover_tasks()

    vim blog/celeryconfig.py (配置Celery的参数文件)

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
     
    """
    #File: celeryconfig.py
    #Time: 2022/3/30 2:54 下午
    #Author: julius
    """
    
    # 设置结果存储
    from typeidea import settings
    import os
     
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "typeidea.settings")
    CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
    # 设置代理人broker
    BROKER_URL = 'redis://127.0.0.1:6379/1'
    # celery 的启动工作数量设置
    CELERY_WORKER_CONCURRENCY = 20
    # 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。
    CELERYD_PREFETCH_MULTIPLIER = 20
    # 非常重要,有些情况下可以防止死锁
    CELERYD_FORCE_EXECV = True
    # celery 的 worker 执行多少个任务后进行重启操作
    CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
    # 禁用所有速度限制,如果网络资源有限,不建议开足马力。
    CELERY_DISABLE_RATE_LIMITS = True
     
    CELERY_ENABLE_UTC = False
    CELERY_TIMEZONE = settings.TIME_ZONE
    DJANGO_CELERY_BEAT_TZ_AWARE = False
    CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

    vim blog/tasks.py (tasks 任务文件)

    import time
    from blog.celery import app
     
    # 创建任务函数
    @app.task
    def my_task(a, b, c):
        print('任务正在执行...')
        print('任务1函数休眠10s')
        time.sleep(10)
        return a + b + c

    五、开始使用django-celery-beat调度器

    使用 django-celery-beat 动态添加定时任务  celery 4.x 版本在 django 框架中是使用 django-celery-beat 进行动态添加定时任务的。前面虽然已经安装了这个库,但是还要再说明一下。

    1. 安装 django-celery-beat

    pip3 install django-celery-beat

    2.在项目的 settings 文件配置 django-celery-beat 

    吐槽大师
    吐槽大师

    吐槽大师(Roast Master) - 终极 AI 吐槽生成器,适用于 Instagram,Facebook,Twitter,Threads 和 Linkedin

    下载
    INSTALLED_APPS = [
        'blog',
        'django_celery_beat',
        ...
    ]
     
    # Django设置时区
    LANGUAGE_CODE = 'zh-hans'  # 使用中国语言
    TIME_ZONE = 'Asia/Shanghai'  # 设置Django使用中国上海时间
    # 如果USE_TZ设置为True时,Django会使用系统默认设置的时区,此时的TIME_ZONE不管有没有设置都不起作用
    # 如果USE_TZ 设置为False,TIME_ZONE = 'Asia/Shanghai', 则使用上海的UTC时间。
    USE_TZ = False

    3. 创建 django-celery-beat 相关表

    执行Django数据库迁移: python manage.py migrate

    怎么用Python Celery动态添加定时任务

    4. 配置Celery使用 django-celery-beat

    配置 celery.py

    import os
     
    from celery import Celery
     
    from blog import celeryconfig
     
    # 为celery 设置环境变量
    os.environ.setdefault("DJANGO_SETTINGS_MODULE","typeidea.settings")
    # 创建celery app
    app = Celery('blog')
    # 从单独的配置模块中加载配置
    app.config_from_object(celeryconfig)
     
    # 设置app自动加载任务
    app.autodiscover_tasks([
        'blog',
    ])

    配置 celeryconfig.py

    # 设置结果存储
    from typeidea import settings
    import os
     
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "typeidea.settings")
    CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
    # 设置代理人broker
    BROKER_URL = 'redis://127.0.0.1:6379/1'
    # celery 的启动工作数量设置
    CELERY_WORKER_CONCURRENCY = 20
    # 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。
    CELERYD_PREFETCH_MULTIPLIER = 20
    # 非常重要,有些情况下可以防止死锁
    CELERYD_FORCE_EXECV = True
    # celery 的 worker 执行多少个任务后进行重启操作
    CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
    # 禁用所有速度限制,如果网络资源有限,不建议开足马力。
    CELERY_DISABLE_RATE_LIMITS = True
     
    CELERY_ENABLE_UTC = False
    CELERY_TIMEZONE = settings.TIME_ZONE
    DJANGO_CELERY_BEAT_TZ_AWARE = False
    CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

    编写任务 tasks.py

    import time
    from celery import Celery
    from blog.celery import app
     
    # 使用redis做为broker
    # app = Celery('blog.tasks2',broker='redis://127.0.0.1:6379/0',backend='redis://127.0.0.1:6379/1')
     
    # 创建任务函数
    @app.task
    def my_task(a, b, c):
        print('任务正在执行...')
        print('任务1函数休眠10s')
        time.sleep(10)
        return a + b + c
     
    @app.task
    def my_task2():
        print("任务2函数正在执行....")
        print('任务2函数休眠10s')
        time.sleep(10)

    5. 启动定时任务work

    启动定时任务首先需要有一个work执行异步任务,然后再启动一个定时器触发任务。

    启动任务 work

    $ celery -A blog worker -l info

    怎么用Python Celery动态添加定时任务

    启动定时器触发 beat

    celery -A blog beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

    怎么用Python Celery动态添加定时任务

    六、具体操作演练

    6.1 创建基于间隔时间的周期性任务

    1. 初始化周期间隔对象interval 对象

    >>> from django_celery_beat.models import PeriodicTask, IntervalSchedule
    >>> schedule, created = IntervalSchedule.objects.get_or_create( 
    ...       every=10, 
    ...       period=IntervalSchedule.SECONDS, 
    ...  )
    >>> IntervalSchedule.objects.all()
    <QuerySet [<IntervalSchedule: every 10 seconds>]>

    2.创建一个无参数的周期性间隔任务

    >>>PeriodicTask.objects.create(interval=schedule,name='my_task2',task='blog.tasks.my_task2',)
    <PeriodicTask: my_task2: every 10 seconds>

    beat 调度服务日志显示如下:

    怎么用Python Celery动态添加定时任务

     worker 服务日志显示如下:

    怎么用Python Celery动态添加定时任务

    3.创建一个带参数的周期性间隔任务

    >>> PeriodicTask.objects.create(interval=schedule,name='my_task',task='blog.tasks.my_task',args=json.dumps([10,20,30]))
    <PeriodicTask: my_task: every 10 seconds>

    beat 调度服务日志结果:

    怎么用Python Celery动态添加定时任务

     worker 服务日志结果:

    怎么用Python Celery动态添加定时任务

    4.如何高并发执行任务

    需要并行执行任务的时候,就需要设置多个worker来执行任务。 

    6.2 创建一个不带参数的周期性间隔任务

    1.初始化 crontab 的调度对象

    >>> import pytz
    >>> schedule, _ = CrontabSchedule.objects.get_or_create(
    ... minute='*',
    ... hour='*',
    ... day_of_week='*',
    ... day_of_month='*',
    ... timezone=pytz.timezone('Asia/Shanghai')
    ... )

    2. 创建不带参数的定时任务

    PeriodicTask.objects.create(crontab=schedule,name='my_task2_crontab',task='blog.tasks.my_task2',)

    beat 调度服务执行结果 

    怎么用Python Celery动态添加定时任务

     worker 执行服务结果

    怎么用Python Celery动态添加定时任务

    6.3 周期性任务的查询、删除操作

    1. 周期性任务的查询

    >>> PeriodicTask.objects.all()
    <ExtendedQuerySet [<PeriodicTask: celery.backend_cleanup: 0 4 * * * (m/h/dM/MY/d) Asia/Shanghai>, <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>]>
    >>> PeriodicTask.objects.get(name='my_task2_crontab')
    <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>
    >>> for task in PeriodicTask.objects.all():
    ...     print(task.id)
    ... 
    1
    13
    >>> PeriodicTask.objects.get(id=13)
    <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>
    >>> PeriodicTask.objects.get(name='my_task2_crontab')
    <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>

     控制台实际操作记录

    怎么用Python Celery动态添加定时任务

    2.周期性任务的暂停/启动

    2.1 设置my_taks2_crontab 暂停任务

    >>> my_task2_crontab = PeriodicTask.objects.get(id=13)
    >>> my_task2_crontab.enabled
    True
    >>> my_task2_crontab.enabled=False
    >>> my_task2_crontab.save()

    查看worker输出:

    怎么用Python Celery动态添加定时任务

     可以看到worker从19:31以后已经没有输出了,说明已经成功吧my_task2_crontab 任务暂停

    2.2 设置my_task2_crontab 开启任务

    把任务的 enabled 为 True 即可:

    >>> my_task2_crontab.enabled
    False
    >>> my_task2_crontab.enabled=True
    >>> my_task2_crontab.save()

    查看worker输出:

    怎么用Python Celery动态添加定时任务

     可以看到worker从19:36开始有输出,说明已把my_task2_crontab 任务重新启动

    3. 周期性任务的删除

    获取到指定的任务后调用delete(),再次查询指定任务会发现已经不存在了

    PeriodicTask.objects.get(name='my_task2_crontab').delete()
    >>> PeriodicTask.objects.get(name='my_task2_crontab')
    Traceback (most recent call last):
      File "<console>", line 1, in <module>
      File "/Users/julius/PycharmProjects/typeidea/.venv/lib/python3.9/site-packages/django/db/models/manager.py", line 85, in manager_method
        return getattr(self.get_queryset(), name)(*args, **kwargs)
      File "/Users/julius/PycharmProjects/typeidea/.venv/lib/python3.9/site-packages/django/db/models/query.py", line 435, in get
        raise self.model.DoesNotExist(
    django_celery_beat.models.PeriodicTask.DoesNotExist: PeriodicTask matching query does not exist.

    相关文章

    python速学教程(入门到精通)
    python速学教程(入门到精通)

    python怎么学习?python怎么入门?python在哪学?python怎么学才快?不用担心,这里为大家提供了python速学教程(入门到精通),有需要的小伙伴保存下载就能学习啦!

    下载

    本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

    热门AI工具

    更多
    DeepSeek
    DeepSeek

    幻方量化公司旗下的开源大模型平台

    豆包大模型
    豆包大模型

    字节跳动自主研发的一系列大型语言模型

    WorkBuddy
    WorkBuddy

    腾讯云推出的AI原生桌面智能体工作台

    腾讯元宝
    腾讯元宝

    腾讯混元平台推出的AI助手

    文心一言
    文心一言

    文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

    讯飞写作
    讯飞写作

    基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

    即梦AI
    即梦AI

    一站式AI创作平台,免费AI图片和视频生成。

    ChatGPT
    ChatGPT

    最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

    相关专题

    更多
    Python Web 框架 Django 深度开发
    Python Web 框架 Django 深度开发

    本专题系统讲解 Python Django 框架的核心功能与进阶开发技巧,包括 Django 项目结构、数据库模型与迁移、视图与模板渲染、表单与认证管理、RESTful API 开发、Django 中间件与缓存优化、部署与性能调优。通过实战案例,帮助学习者掌握 使用 Django 快速构建功能全面的 Web 应用与全栈开发能力。

    166

    2026.02.04

    rabbitmq和kafka有什么区别
    rabbitmq和kafka有什么区别

    rabbitmq和kafka的区别:1、语言与平台;2、消息传递模型;3、可靠性;4、性能与吞吐量;5、集群与负载均衡;6、消费模型;7、用途与场景;8、社区与生态系统;9、监控与管理;10、其他特性。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

    207

    2024.02.23

    Java 消息队列与异步架构实战
    Java 消息队列与异步架构实战

    本专题系统讲解 Java 在消息队列与异步系统架构中的核心应用,涵盖消息队列基本原理、Kafka 与 RabbitMQ 的使用场景对比、生产者与消费者模型、消息可靠性与顺序性保障、重复消费与幂等处理,以及在高并发系统中的异步解耦设计。通过实战案例,帮助学习者掌握 使用 Java 构建高吞吐、高可靠异步消息系统的完整思路。

    49

    2026.01.28

    数据库Delete用法
    数据库Delete用法

    数据库Delete用法:1、删除单条记录;2、删除多条记录;3、删除所有记录;4、删除特定条件的记录。更多关于数据库Delete的内容,大家可以访问下面的文章。

    287

    2023.11.13

    drop和delete的区别
    drop和delete的区别

    drop和delete的区别:1、功能与用途;2、操作对象;3、可逆性;4、空间释放;5、执行速度与效率;6、与其他命令的交互;7、影响的持久性;8、语法和执行;9、触发器与约束;10、事务处理。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

    222

    2023.12.29

    vim保存退出命令
    vim保存退出命令

    vim是一个非常强大的文本编辑器,常用于Unix和Linux系统。它是从vi发展而来的,相比vi有许多改进和扩展。在vim中,保存并退出的命令是:wq"wq"这个命令是由两个部分组成的。其中,"w"表示写入文件,将所做的更改保存到磁盘;而"q"表示退出vim编辑器。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

    269

    2023.08.01

    常用的数据库软件
    常用的数据库软件

    常用的数据库软件有MySQL、Oracle、SQL Server、PostgreSQL、MongoDB、Redis、Cassandra、Hadoop、Spark和Amazon DynamoDB。更多关于数据库软件的内容详情请看本专题下面的文章。php中文网欢迎大家前来学习。

    1007

    2023.11.02

    内存数据库有哪些
    内存数据库有哪些

    内存数据库有Redis、Memcached、Apache Ignite、VoltDB、TimesTen、H2 Database、Aerospike、Oracle TimesTen In-Memory Database、SAP HANA和ache Cassandra。更多关于内存数据库相关问题,详情请看本专题下面的文章。php中文网欢迎大家前来学习。

    673

    2023.11.14

    TypeScript类型系统进阶与大型前端项目实践
    TypeScript类型系统进阶与大型前端项目实践

    本专题围绕 TypeScript 在大型前端项目中的应用展开,深入讲解类型系统设计与工程化开发方法。内容包括泛型与高级类型、类型推断机制、声明文件编写、模块化结构设计以及代码规范管理。通过真实项目案例分析,帮助开发者构建类型安全、结构清晰、易维护的前端工程体系,提高团队协作效率与代码质量。

    26

    2026.03.13

    热门下载

    更多
    网站特效
    /
    网站源码
    /
    网站素材
    /
    前端模板

    精品课程

    更多
    相关推荐
    /
    热门推荐
    /
    最新课程
    最新Python教程 从入门到精通
    最新Python教程 从入门到精通

    共4课时 | 22.5万人学习

    Django 教程
    Django 教程

    共28课时 | 5万人学习

    SciPy 教程
    SciPy 教程

    共10课时 | 1.9万人学习

    关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
    php中文网:公益在线php培训,帮助PHP学习者快速成长!
    关注服务号 技术交流群
    PHP中文网订阅号
    每天精选资源文章推送

    Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号