
本文探讨了在celery中处理动态创建子任务并确保其完成同步等待的挑战。针对celery链(chain)和弦(chord)无法在运行时动态添加依赖的局限性,文章提出并详细阐述了一种手动实现策略。该策略通过在父任务中收集动态子任务id,并使用循环轮询这些子任务的状态,直至所有子任务成功完成,从而实现精确的流程控制和数据完整性保障。
在构建复杂的分布式任务流时,我们经常遇到需要顺序执行一系列主任务,但在某个主任务内部,为了提高效率,又希望并行处理一些子任务的场景。例如,一个主任务可能需要通过API分批获取数据页,每获取一页数据后,就立即触发一个子任务来处理和写入数据库。由于数据库写入操作耗时较长且数量庞大,将其异步化为子任务可以显著减少主任务的整体墙钟时间。然而,关键挑战在于,下一个主任务必须等待所有这些动态创建的数据库写入子任务完成后才能继续执行,以确保数据完整性。
Celery提供了强大的任务编排工具,如chain、chord和group,用于定义任务之间的依赖关系和执行顺序。然而,这些工具的核心设计理念是基于预先定义的任务签名(signatures)。这意味着,在创建chain或chord时,所有参与的任务及其依赖关系都必须是已知的。
对于我们上述的场景,子任务是在主任务执行过程中动态生成的,其数量和具体签名无法在主任务启动前确定。在这种情况下,传统的Celery编排工具便显得力不从心:
简而言之,Celery的编排机制无法在任务被调度到Worker后,动态地修改其依赖关系或为其添加新的、运行时产生的子任务。任何阻塞等待逻辑都必须由任务本身显式地实现。
鉴于Celery原生编排工具的局限性,解决动态子任务同步等待问题的有效方法是手动实现一个轮询(polling)机制。这种策略的核心思想是:父任务在创建所有动态子任务后,收集这些子任务的ID,然后进入一个循环,周期性地检查每个子任务的状态,直到所有子任务都成功完成。
以下是实现这一策略的详细步骤和示例代码:
在父任务中,当需要创建子任务时,使用apply_async()方法调度它们,并务必将返回的AsyncResult对象的id属性收集到一个列表中。这个列表将用于后续的轮询。
import time
from typing import List
from celery import Celery, Task, AsyncResult
from celery.signals import task_postrun
# 假设的Celery应用实例
app = Celery('my_app', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
# 模拟的JobMaster和常量,用于日志记录
class JobMaster:
def get_job(self, job_id, job_title):
print(f"[JobMaster] Getting job {job_id} - {job_title}")
return self, job_id
def log_message(self, log_message, status=None, job_score=None):
print(f"[JobMaster] Log: {log_message} (Status: {status}, Score: {job_score})")
class Consts:
IN_PROGRESS = "IN_PROGRESS"
COMPLETED = "COMPLETED"
ERRORS_FOUND = "ERRORS_FOUND"
# 模拟的子任务
@app.task
def task_dummy_subtask(parent_task_name: str, job_id: int = None):
job, _ = JobMaster().get_job(job_id, job_title="dummy subtask")
job.log_message(log_message=f"Subtask {parent_task_name} started.")
time.sleep(2) # 模拟耗时操作
job.log_message(log_message=f"Subtask {parent_task_name} finished successfully.")
return f"Result from {parent_task_name}"
# 模拟的中间函数,用于创建子任务
def intermediary_dummy_subtask_function(parent_task_name: str, job_id: int = None) -> AsyncResult:
job, _ = JobMaster().get_job(job_id, job_title="dummy task")
job.log_message(log_message=f"Intermediary function for {parent_task_name} has been reached, will now make a task")
r = task_dummy_subtask.apply_async(kwargs={"parent_task_name": parent_task_name, "job_id": job_id},
add_to_parent=True)
return r
@app.task(bind=True)
def task_dummy_task1(self: Task, part_number: int, job_id: int = None):
job, job_id = JobMaster().get_job(job_id, job_title="dummy task")
sleeping_duration = 1
subtask_ids = []
job.log_message(log_message=f"Entered dummy task 1 with sleeping duration of {sleeping_duration}")
# 直接创建子任务
job.log_message(log_message="In dummy task1, creating subtask a")
subtask = task_dummy_subtask.apply_async(kwargs={"parent_task_name": "task1_a", "job_id": job_id},
add_to_parent=True)
subtask_ids.append(subtask.id)
job.log_message(log_message="In dummy task1, creating subtask b")
subtask = task_dummy_subtask.apply_async(kwargs={"parent_task_name": "task1_b", "job_id": job_id},
add_to_parent=True)
subtask_ids.append(subtask.id)
# 通过中间函数创建子任务
job.log_message(log_message="In dummy task1, creating intermediary subtask c")
subtask = intermediary_dummy_subtask_function(parent_task_name="task1_c", job_id=job_id)
subtask_ids.append(subtask.id)
time.sleep(sleeping_duration) # 模拟主任务的其他操作
# 等待所有子任务完成
wait_for_tasks_to_complete(async_ids=subtask_ids, job_id=job_id,
msg="Waiting in dummy task1 for subtasks to complete")
job.log_message(log_message="Finished dummy task1 main body")
return part_number创建一个辅助函数,如wait_for_tasks_to_complete,它接收子任务ID列表、日志ID和可选的超时时间。该函数将循环检查每个子任务的状态,直到所有子任务都完成或达到超时。
def wait_for_tasks_to_complete(async_ids: List[str], job_id: int = None, msg: str = None, timeout: int = 300):
job, _ = JobMaster().get_job(job_id, job_title="waiting for refresh data")
# 复制一份ID列表,因为在循环中会移除已完成的任务
remaining_async_ids = list(async_ids)
job.log_message(log_message=f"Waiting for {len(remaining_async_ids)} tasks to complete, {msg}", status=Consts.IN_PROGRESS, job_score=0)
job.log_message(log_message=f"tasks: {remaining_async_ids}", status=Consts.IN_PROGRESS, job_score=0)
count_down = timeout
while count_down > 0:
# 遍历剩余任务,检查其状态
tasks_to_check = list(remaining_async_ids) # 避免在迭代时修改列表
all_succeeded_in_this_check = True
for async_id in tasks_to_check:
result = app.AsyncResult(async_id) # 获取任务结果对象
status = result.status
if status == "SUCCESS":
returned_value = result.result
job.log_message(log_message=f"Task {async_id} confirmed status SUCCESS with {returned_value=}")
remaining_async_ids.remove(async_id) # 从待检查列表中移除
elif status in ["FAILURE", "REVOKED", "RETRY"]: # 考虑失败或撤销状态
job.log_message(log_message=f"Task {async_id} failed or revoked with status {status}. Aborting wait.", status=Consts.ERRORS_FOUND)
# 根据业务需求,可以选择在此处抛出异常或返回失败
return False
else:
all_succeeded_in_this_check = False # 仍有任务未完成或未成功
# 如果所有任务都已完成
if not remaining_async_ids:
job.log_message(log_message="Finished waiting for refresh data, all tasks succeeded",
status=Consts.COMPLETED, job_score=100)
return True # 所有任务成功完成
count_down -= 1
job.log_message(log_message=f"There are {len(remaining_async_ids)} tasks remaining. Waiting...", status=Consts.IN_PROGRESS)
time.sleep(1) # 避免忙等,每秒检查一次
# 超时退出
job.log_message(log_message=f"After waiting for {timeout=} seconds, some tasks did not complete on time. Remaining tasks: {remaining_async_ids}",
status=Consts.ERRORS_FOUND, job_score=100)
return False # 超时,未所有任务完成在父任务中,创建完所有动态子任务并进行其他必要操作后,调用上述wait_for_tasks_to_complete函数。父任务会在此处阻塞,直到所有子任务完成或超时。
# task_dummy_task1 的最后部分
# ...
time.sleep(sleeping_duration)
# 等待所有子任务完成
if wait_for_tasks_to_complete(async_ids=subtask_ids, job_id=job_id,
msg="Waiting in dummy task1 for subtasks to complete"):
job.log_message(log_message="Finished dummy task1 main body after all subtasks completed.")
else:
job.log_message(log_message="Dummy task1 finished with issues: subtasks did not complete on time or failed.", status=Consts.ERRORS_FOUND)
return part_number尽管Celery的内置编排工具在处理预定义任务流时表现出色,但面对运行时动态创建的子任务,它们存在固有的局限性。通过手动收集动态子任务ID并实现一个轮询等待循环,我们可以有效地解决这一挑战,确保父任务在所有相关子任务完成后才继续执行。这种手动策略虽然增加了代码的复杂性,但为需要精确流程控制和数据完整性的复杂异步任务场景提供了必要的灵活性和可靠性。在实施时,务必关注错误处理、超时管理和合理的轮询频率,以构建健壮的分布式系统。
以上就是Celery动态子任务的同步等待机制:突破链式限制的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号