
本文详细阐述了如何在Apache Airflow中实现基于特定日期条件的DAG任务条件化执行。通过利用PythonSensor,结合自定义的Python函数来判断例如“是否为月末最后一个周二”等复杂日期逻辑,我们能够精确控制DAG的启动。教程提供了完整的代码示例,展示了如何构建一个PythonSensor来检查条件,并在条件不满足时阻止下游任务运行,从而确保DAG仅在符合业务规则时才被触发。
在数据管道和自动化工作流管理中,Apache Airflow因其强大的调度和编排能力而广受欢迎。然而,许多业务场景要求DAG(有向无环图)中的任务并非每次都运行,而是需要满足特定的条件。例如,某些报告任务可能只在每月的特定一天(如月末最后一个周二)执行。本文将深入探讨如何利用Airflow的PythonSensor来实现这种复杂的条件化执行逻辑,从而在任务实际运行前进行预检查。
Airflow提供了多种实现条件化执行的机制:
对于“如果条件满足则运行所有任务,否则不运行任何任务”的需求,传感器是更直接且推荐的解决方案。特别是当条件涉及复杂的自定义逻辑时,PythonSensor提供了最大的灵活性。
立即学习“Python免费学习笔记(深入)”;
PythonSensor是Airflow中一个非常强大的传感器,它允许用户定义一个Python可调用对象(函数),该对象会周期性地被执行。只要这个可调用对象返回False,传感器就会继续等待(“poke”)。一旦返回True,传感器任务即成功完成,并触发其所有下游任务。
PythonSensor的关键参数包括:
我们的目标是创建一个检查,判断当前的Airflow执行日期(execution_date)是否是当月的最后一个周二。
首先,我们需要一个Python函数来执行这个日期逻辑。这个函数将接收Airflow的context字典作为参数,从中获取execution_date。
from datetime import datetime, timedelta
import calendar
def is_last_tuesday_of_month(**context):
"""
检查Airflow的execution_date是否是当前月份的最后一个周二。
"""
execution_date = context["execution_date"]
current_year = execution_date.year
current_month = execution_date.month
# 获取当前月份的总天数
# calendar.monthrange(year, month) 返回 (该月第一天的星期几, 该月总天数)
_, num_days_in_month = calendar.monthrange(current_year, current_month)
# 构建当前月份的最后一天
last_day_of_month = datetime(current_year, current_month, num_days_in_month).date()
# 从月末最后一天开始往前推,直到找到第一个周二
days_to_subtract = 0
while (last_day_of_month - timedelta(days=days_to_subtract)).weekday() != calendar.TUESDAY:
days_to_subtract += 1
# 安全检查,防止无限循环(虽然通常不会发生,因为每个月至少有一个周二)
if days_to_subtract > 7:
print(f"Error: Could not find Tuesday in {current_year}-{current_month}")
return False
last_tuesday_date = last_day_of_month - timedelta(days=days_to_subtract)
print(f"Execution Date: {execution_date.date()}")
print(f"Last Tuesday of {current_year}-{current_month}: {last_tuesday_date}")
# 比较执行日期与计算出的月末最后一个周二
return execution_date.date() == last_tuesday_date函数说明:
现在,我们将这个检查函数集成到Airflow DAG中。
from airflow import DAG
from airflow.sensors.python import PythonSensor
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
import calendar
# 定义上述的 is_last_tuesday_of_month 函数
def is_last_tuesday_of_month(**context):
"""
检查Airflow的execution_date是否是当前月份的最后一个周二。
"""
execution_date = context["execution_date"]
current_year = execution_date.year
current_month = execution_date.month
_, num_days_in_month = calendar.monthrange(current_year, current_month)
last_day_of_month = datetime(current_year, current_month, num_days_in_month).date()
days_to_subtract = 0
while (last_day_of_month - timedelta(days=days_to_subtract)).weekday() != calendar.TUESDAY:
days_to_subtract += 1
if days_to_subtract > 7:
print(f"Error: Could not find Tuesday in {current_year}-{current_month}")
return False
last_tuesday_date = last_day_of_month - timedelta(days=days_to_subtract)
print(f"Checking date: {execution_date.date()}")
print(f"Calculated last Tuesday: {last_tuesday_date}")
return execution_date.date() == last_tuesday_date
with DAG(
dag_id='conditional_last_tuesday_dag',
start_date=datetime(2023, 1, 1),
schedule_interval='@daily', # 每天运行,但只有在条件满足时才执行下游任务
catchup=False,
tags=['example', 'sensor', 'conditional'],
) as dag:
# 传感器任务:检查是否是月末最后一个周二
check_last_tuesday = PythonSensor(
task_id='check_if_last_tuesday',
python_callable=is_last_tuesday_of_month,
poke_interval=60 * 5, # 每5分钟检查一次 (在实际生产中,对于日期检查可以设置更长的间隔或一次性检查)
timeout=60 * 60 * 24, # 最长等待24小时
soft_fail=True, # 如果条件不满足,任务标记为skipped,下游任务也skipped
mode='poke', # 默认模式,周期性执行callable
)
# 原始任务T1到T5,现在它们将依赖于传感器的成功
T1 = BashOperator(
task_id='delete_gcs_files',
bash_command='echo "Deleting all files from GCS..." && sleep 5',
)
T2 = BashOperator(
task_id='run_sql_query_1',
bash_command='echo "Running SQL query 1 and outputting to BigQuery..." && sleep 5',
)
T3 = BashOperator(
task_id='run_sql_query_2',
bash_command='echo "Running SQL query 2 and outputting to BigQuery..." && sleep 5',
)
T4 = BashOperator(
task_id='run_sql_query_3',
bash_command='echo "Running SQL query 3 and placing CSV in GCS..." && sleep 5',
)
T5 = BashOperator(
task_id='copy_append_history',
bash_command='echo "Copying and appending reference numbers to history table..." && sleep 5',
)
# 定义任务依赖关系
# 只有当check_last_tuesday传感器成功时,T1及后续任务才会运行
check_last_tuesday >> T1 >> T2 >> T3 >> T4 >> T5DAG结构解释:
通过巧妙地结合PythonSensor和自定义的Python日期检查函数,我们能够为Airflow DAG引入强大的条件化执行能力。这种方法不仅实现了“月末最后一个周二”这样的复杂日期逻辑,而且通过soft_fail=True参数,优雅地处理了条件不满足时停止下游任务的需求,避免了不必要的资源消耗和任务执行。掌握这种模式,将大大提升Airflow DAG的智能性和适应性,使其更好地服务于多样化的业务场景。
以上就是Airflow DAG条件化执行:利用PythonSensor实现特定日期检查的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号