
本文详细介绍了如何在 Apache Airflow 中实现基于特定日期条件的任务流控制。通过利用 Airflow 的 Python Sensor,我们可以灵活地在 DAG 运行前检查自定义条件(例如,是否为每月的最后一个周二),并据此决定是否继续执行后续任务,从而避免不必要的资源消耗,优化工作流效率。
Apache Airflow 作为一个强大的工作流管理平台,通常按照预定义的任务依赖关系顺序执行任务。然而,在实际应用中,我们经常会遇到需要根据特定条件来决定是否执行某个任务流的场景。例如,一个数据同步或报告生成流程可能只在每月的特定日期(如月末、月初或某个特定的工作日)才需要运行,而在其他时间则应暂停或跳过。
在这种情况下,直接让 DAG 运行所有任务,然后在任务内部通过条件判断来跳过执行,不仅会消耗不必要的调度资源,也使得 DAG 运行日志变得复杂。更优雅的解决方案是在任务流的起点引入一个条件检查机制,只有当条件满足时,才触发后续任务的执行。Airflow 提供了多种实现方式,其中 Sensor(传感器)是处理此类前置条件检查的理想工具。
Airflow Sensor 是一种特殊的 Operator,其主要职责是周期性地检查某个外部条件或内部状态,直到条件满足为止。一旦条件满足,Sensor 任务就会成功完成,并触发其下游任务的执行。如果条件在设定的超时时间内未能满足,Sensor 任务可以选择失败,从而阻止下游任务的运行。
在众多 Sensor 类型中,PythonSensor 提供了最大的灵活性。它允许用户定义一个任意的 Python 函数作为条件检查逻辑。该函数应返回 True 表示条件满足,False 表示条件不满足。这使得 PythonSensor 能够处理任何可以通过 Python 代码表达的复杂条件。
为了实现“只有当当前 Airflow 运行的 execution_date 是该月的最后一个周二时才继续执行”的逻辑,我们需要创建一个 Python 函数,并将其集成到 PythonSensor 中。
Serendipity是一个采用PHP实现的智能博客BLOG系统,Serendipity功能丰富,符合标准,基于BSDLicense开源。 Serendipity 2.1.3 更新日志:2018-08-16 *安全性:确保RSS的管理员配置和博客条目限制被解析为SQL查询的整数; *安全性:在“编辑条目”面板中防止XSS可能性; *安全性:禁止向多个人发送评论通知和邮件地址;这可用于批
93
首先,定义一个 Python 函数来判断给定日期是否为该月的最后一个周二。
from datetime import datetime, timedelta
import calendar
def is_last_tuesday_of_month(execution_date: datetime, **context) -> bool:
"""
检查给定的 execution_date 是否是该月的最后一个周二。
"""
# 获取当前月份的最后一天
year = execution_date.year
month = execution_date.month
last_day_of_month = calendar.monthrange(year, month)[1]
# 从最后一天开始向前查找第一个周二
current_day = datetime(year, month, last_day_of_month)
# 遍历直到找到周二 (weekday() 返回 0-6,周一到周日)
# 周二的 weekday() 值为 1
while current_day.weekday() != calendar.TUESDAY:
current_day -= timedelta(days=1)
# 检查找到的周二是否与 execution_date 的日期部分相同
# 考虑到 execution_date 通常是 DAG 运行的开始时间,我们只关心日期部分
if current_day.date() == execution_date.date():
print(f"条件满足:{execution_date.date()} 是 {month} 月的最后一个周二。")
return True
else:
print(f"条件不满足:{execution_date.date()} 不是 {month} 月的最后一个周二。")
return False
函数说明:
接下来,我们将这个条件函数集成到 Airflow DAG 中,使用 PythonSensor 作为前置检查任务。
from airflow import DAG
from airflow.sensors.python import PythonSensor
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
import calendar
# 导入上面定义的条件函数
# from your_module import is_last_tuesday_of_month
# 如果函数定义在同一个文件中,则不需要导入
# DAG 定义
with DAG(
dag_id='conditional_last_tuesday_run',
start_date=datetime(2023, 1, 1),
schedule_interval='0 0 * * *', # 每天运行一次,以便检查条件
catchup=False,
tags=['example', 'sensor', 'condition'],
) as dag:
# 定义 PythonSensor 任务
check_last_tuesday = PythonSensor(
task_id='check_if_last_tuesday_of_month',
python_callable=is_last_tuesday_of_month,
# op_kwargs 允许向 python_callable 传递额外的关键字参数
# 在这里,我们将 execution_date 显式传递,以确保函数使用正确的日期
op_kwargs={'execution_date': '{{ ds }}'}, # {{ ds }} 是 Airflow 模板,解析为执行日期字符串
poke_interval=60 * 5, # 每 5 分钟检查一次条件
timeout=60 * 60 * 24, # 最多等待 24 小时
mode='poke', # 使用 'poke' 模式,周期性检查
)
# 原始任务 T1-T5
# T1 = deletes all files from GCS
t1 = BashOperator(
task_id='delete_gcs_files',
bash_command='echo "Deleting GCS files..."',
)
# T2 = Runs SQL query 1 and outputs to a table within BigQuery
t2 = BashOperator(
task_id='run_sql_query_1',
bash_command='echo "Running SQL Query 1..."',
)
# T3 = Runs SQL query 2 and outputs to a table within BigQuery
t3 = BashOperator(
task_id='run_sql_query_2',
bash_command='echo "Running SQL Query 2..."',
)
# T4 = Runs SQL query 3 and places a copy of this output as csv into the GCS that was emptied in T1
t4 = BashOperator(
task_id='run_sql_query_3_and_upload_to_gcs',
bash_command='echo "Running SQL Query 3 and uploading to GCS..."',
)
# T5= Copies and Appends the reference numbers from the file in T4 to a history table in BigQuery.
t5 = BashOperator(
task_id='append_to_history_table',
bash_command='echo "Appending to history table..."',
)
# 定义任务依赖关系
# 只有当 check_last_tuesday 成功后,后续任务才会被执行
check_last_tuesday >> t1 >> t2 >> t3 >> t4 >> t5
代码说明:
通过 PythonSensor,Airflow 提供了强大的机制来实现基于复杂自定义条件的任务流控制。本文演示了如何利用 PythonSensor 结合 Python 日期处理逻辑,实现“每月最后一个周二”的条件判断。这种方法不仅使 DAG 结构更清晰,避免了不必要的任务执行,而且通过灵活的 python_callable,可以应对几乎所有基于 Python 的条件检查需求,极大地提升了 Airflow 工作流的智能性和资源利用效率。
以上就是在 Airflow 中实现基于日期条件的任务流控制的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号