0

0

确保芹菜的公平加工 - 第二部分

聖光之護

聖光之護

发布时间:2024-12-10 22:27:01

|

981人浏览过

|

来源于dev.to

转载

本文在上一篇有关公平处理的文章的基础上探讨了 celery 中的任务优先级。任务优先级提供了一种通过根据自定义标准为任务分配不同优先级来增强后台处理的公平性和效率的方法。

为什么任务级优先级?

任务级优先级提供对任务执行的细粒度控制,无需复杂的实现。通过将所有任务提交到具有指定优先级值的单个队列,工作人员可以根据任务的紧急程度来处理任务。这确保了公平处理,无论提交时间如何。

例如,如果一个租户提交了 100 个任务,而另一个租户不久后提交了 5 个任务,则任务级别优先级会阻止第二个租户等待所有 100 个任务完成。

这种方法根据租户的任务计数动态分配优先级。 每个租户的第一个任务以高优先级开始,但每有 10 个并发任务,优先级就会降低。这可确保任务较少的租户不会遇到不必要的延误。

实施任务优先级

首先,安装 celery 和 redis

pip install celery redis

配置 celery 使用 redis 作为代理并启用基于优先级的任务处理:

from celery import celery

app = celery(
    "tasks",
    broker="redis://localhost:6379/0",
    broker_connection_retry_on_startup=true,
)

app.conf.broker_transport_options = {
    "priority_steps": list(range(10)),
    "sep": ":",
    "queue_order_strategy": "priority",
}

定义一个方法来计算动态优先级,使用redis来缓存每个租户的任务计数:

import redis

redis_client = redis.strictredis(host="localhost", port=6379, db=1)

def calculate_priority(tenant_id):
    """
    calculate task priority based on the number of tasks for the tenant.
    """
    key = f"tenant:{tenant_id}:task_count"
    task_count = int(redis_client.get(key) or 0)
    return min(10, task_count // 10)

创建自定义任务类以在成功完成后减少任务计数:

ColorMagic
ColorMagic

AI调色板生成工具

下载
from celery import task

class tenantawaretask(task):
    def on_success(self, retval, task_id, args, kwargs):
        tenant_id = kwargs.get("tenant_id")

        if tenant_id:
            key = f"tenant:{tenant_id}:task_count"
            redis_client.decr(key, 1)

        return super().on_success(retval, task_id, args, kwargs)

@app.task(name="tasks.send_email", base=tenantawaretask)
def send_email(tenant_id, task_data):
    """
    simulate sending an email.
    """
    sleep(1)
    key = f"tenant:{tenant_id}:task_count"
    task_count = int(redis_client.get(key) or 0)
    logger.info("tenant %s tasks: %s", tenant_id, task_count)

为不同租户触发任务,确保tenant_id包含在任务的关键字参数中:

if __name__ == "__main__":
    tenant_id = 1
    for _ in range(100):
        priority = calculate_priority(tenant_id)
        key = f"tenant:{tenant_id}:task_count"
        redis_client.incr(key, 1)
        send_email.apply_async(
            kwargs={"tenant_id": tenant_id, "task_data": {}}, priority=priority
        )


    tenant_id = 2
    for _ in range(10):
        priority = calculate_priority(tenant_id)
        key = f"tenant:{tenant_id}:task_count"
        redis_client.incr(key, 1)
        send_email.apply_async(
            kwargs={"tenant_id": tenant_id, "task_data": {}}, priority=priority
        )
您可以在此处查看完整代码。

启动 celery worker 并触发任务:

# Run the worker
celery -A tasks worker --loglevel=info

# Trigger the tasks
python tasks.py

此设置演示了 celery 的优先级队列如何与 redis 相结合,通过根据租户活动动态调整优先级来确保公平的任务处理。让我们看看工作人员的简化输出:

确保芹菜的公平加工 - 第二部分

结论

celery 和 redis 的任务级优先级为确保多租户系统中的公平处理提供了强大的解决方案。通过动态分配优先级并利用单个队列,您可以在满足业务需求的同时保持简单性。

实现任务级优先级的方法有很多,例如使用 rabbitmq 效率更高,因为它的核心支持优先级,但由于我们还使用 redis 进行任务计数,因此它简化了我们的整体架构。

希望您觉得这篇文章很有用,并请参阅下一篇!

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
rabbitmq和kafka有什么区别
rabbitmq和kafka有什么区别

rabbitmq和kafka的区别:1、语言与平台;2、消息传递模型;3、可靠性;4、性能与吞吐量;5、集群与负载均衡;6、消费模型;7、用途与场景;8、社区与生态系统;9、监控与管理;10、其他特性。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

207

2024.02.23

Java 消息队列与异步架构实战
Java 消息队列与异步架构实战

本专题系统讲解 Java 在消息队列与异步系统架构中的核心应用,涵盖消息队列基本原理、Kafka 与 RabbitMQ 的使用场景对比、生产者与消费者模型、消息可靠性与顺序性保障、重复消费与幂等处理,以及在高并发系统中的异步解耦设计。通过实战案例,帮助学习者掌握 使用 Java 构建高吞吐、高可靠异步消息系统的完整思路。

48

2026.01.28

常用的数据库软件
常用的数据库软件

常用的数据库软件有MySQL、Oracle、SQL Server、PostgreSQL、MongoDB、Redis、Cassandra、Hadoop、Spark和Amazon DynamoDB。更多关于数据库软件的内容详情请看本专题下面的文章。php中文网欢迎大家前来学习。

1006

2023.11.02

内存数据库有哪些
内存数据库有哪些

内存数据库有Redis、Memcached、Apache Ignite、VoltDB、TimesTen、H2 Database、Aerospike、Oracle TimesTen In-Memory Database、SAP HANA和ache Cassandra。更多关于内存数据库相关问题,详情请看本专题下面的文章。php中文网欢迎大家前来学习。

671

2023.11.14

mongodb和redis哪个读取速度快
mongodb和redis哪个读取速度快

redis 的读取速度比 mongodb 更快。原因包括:1. redis 使用简单的键值存储,而 mongodb 存储 json 格式的数据,需要解析和反序列化。2. redis 使用哈希表快速查找数据,而 mongodb 使用 b-tree 索引。因此,redis 在需要高性能读取操作的应用程序中是一个更好的选择。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

501

2024.04.02

redis怎么做缓存服务器
redis怎么做缓存服务器

redis 作为缓存服务器的答案:redis 是一款开源、高性能、分布式的键值存储,可作为缓存服务器使用。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

413

2024.04.07

redis怎么解决数据一致性
redis怎么解决数据一致性

redis 提供了两种一致性模型,以维护副本数据一致性:强一致性 (sync) 确保写操作仅在复制到所有从节点后才完成;最终一致性 (async) 则在主节点上写操作后认为已完成,牺牲一致性换取性能。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

408

2024.04.07

mysql和redis怎么保证双写一致性
mysql和redis怎么保证双写一致性

确保 mysql 和 redis 双写一致性的技术包括:1、事务性更新:同时更新 mysql 和 redis,保证一致性;2、主从复制:mysql 主服务器更改同步到 redis 从服务器;3、基于事件的更新:mysql 记录更改并发送到 redis等等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

483

2024.04.07

C# ASP.NET Core微服务架构与API网关实践
C# ASP.NET Core微服务架构与API网关实践

本专题围绕 C# 在现代后端架构中的微服务实践展开,系统讲解基于 ASP.NET Core 构建可扩展服务体系的核心方法。内容涵盖服务拆分策略、RESTful API 设计、服务间通信、API 网关统一入口管理以及服务治理机制。通过真实项目案例,帮助开发者掌握构建高可用微服务系统的关键技术,提高系统的可扩展性与维护效率。

3

2026.03.11

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
最新Python教程 从入门到精通
最新Python教程 从入门到精通

共4课时 | 22.5万人学习

Django 教程
Django 教程

共28课时 | 4.9万人学习

SciPy 教程
SciPy 教程

共10课时 | 1.9万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号