
在 airflow 2 中,已弃用的 `{{ prev_execution_date }}` 模板变量应通过 `taskinstance.previous_ti.execution_date` 或 `dagrun.get_previous_dagrun()` 获取前序执行时间,而非依赖过时的模板变量。
Airflow 2 对调度语义进行了重构,引入了更精确的逻辑时间(logical date)与数据区间(data interval)概念。原 prev_execution_date 实际表示的是前一次 DagRun 的 execution_date(即该次运行所代表的逻辑时间点),而它在 Airflow 2 中已被明确移除——因为 execution_date 本身也已重命名为 logical_date,且其语义更聚焦于“本次任务逻辑上处理的数据时间”,而非调度触发时间。
因此,不应使用 prev_data_interval_start_success 或 prev_data_interval_end_success 作为直接替代:
- 这两个变量仅在任务成功完成时才被记录,且反映的是前次成功运行所覆盖的数据区间端点(如 2024-01-01T00:00:00 → 2024-01-02T00:00:00),并非等价于旧版 prev_execution_date(即前次 DagRun 的 logical_date);
- 若前次运行失败或跳过,这些变量将为空,导致逻辑不可靠。
✅ 推荐做法:在 PythonOperator 或自定义 Operator 中,通过上下文对象安全获取前序执行时间:
from airflow.models import TaskInstance, DagRun
def my_task(ti: TaskInstance, **kwargs):
# 方式1:通过当前 TaskInstance 查找前一个同名 task instance(同一 DAG、同一 task_id)
prev_ti = ti.previous_ti
if prev_ti:
prev_logical_date = prev_ti.logical_date # Airflow 2+ 推荐:等价于旧版 prev_execution_date
print(f"Previous logical date (via previous_ti): {prev_logical_date}")
# 方式2:通过当前 DagRun 获取前一个 DagRun(更贴近原 prev_execution_date 语义)
prev_dag_run = DagRun.get_previous_dagrun(dag_run=ti.dag_run)
if prev_dag_run:
print(f"Previous logical date (via DagRun): {prev_dag_run.logical_date}")⚠️ 注意事项:
- ti.previous_ti 仅在同一 task_id 下存在历史实例时有效(适用于周期性任务),但不保证前次成功;若需严格依赖“上一次成功运行”,应结合 prev_ti.state == 'success' 判断;
- DagRun.get_previous_dagrun() 返回的是调度顺序上的前一个 DagRun(无论成败),语义最接近原始 prev_execution_date;
- 所有模板变量(如 {{ ds }}, {{ execution_date }})均已迁移到基于 logical_date 和 data_interval 的新体系,建议统一使用 {{ dag_run.logical_date }} 和 {{ dag_run.data_interval_start }} 等标准变量替代旧模板。
总结:迁移时请放弃对 prev_execution_date 的模板依赖,改用 TaskInstance.previous_ti.logical_date 或 DagRun.get_previous_dagrun().logical_date —— 它们语义清晰、稳定可靠,且完全兼容 Airflow 2 的调度模型。










