
本文介绍如何在 Apache Airflow 中设计两个独立、定时触发的 DAG,分别在高峰起始与结束时刻精确执行一次带宽调整操作,并通过幂等性设计确保任务失败可重试、重复调度不重复生效。
本文介绍如何在 apache airflow 中设计两个独立、定时触发的 dag,分别在高峰起始与结束时刻精确执行一次带宽调整操作,并通过幂等性设计确保任务失败可重试、重复调度不重复生效。
在实际网络运维场景中(如 ISP 带宽动态调控),我们常需在特定时间窗口(例如每日 09:00–13:00)仅执行一次策略变更动作:高峰开始时统一限速,高峰结束时恢复原速。关键挑战在于:
- ✅ 动作必须严格按时触发(非轮询判断);
- ✅ 同一时刻只执行一次,避免重复调用导致配置冲突;
- ✅ 支持失败自动重试,但重试不应破坏业务语义(即“恢复带宽”不可因重试而误降速);
- ❌ 不应依赖 BranchDateTimeOperator 等运行时条件分支——它无法解决“已执行过”的状态记忆问题,且每日仅调度一次的 DAG 中,分支逻辑易造成误判或漏判。
最佳实践是:将“进入高峰”和“退出高峰”拆分为两个完全解耦、独立调度的 DAG。 每个 DAG 只含一个幂等任务,通过精准的 schedule 触发,天然满足“单次执行 + 可重试”要求。
✅ 推荐方案:双 DAG 架构(推荐 Airflow 2.6+)
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import time
import pendulum
# 共享策略获取逻辑(建议抽取为工具函数)
def fetch_active_policy():
"""从数据库/配置中心获取当前生效的带宽策略"""
# 示例:实际应查询 DB 或 API,返回如 {'id': 1, 'start_time': '09:00', 'end_time': '13:00'}
return {'id': 1, 'start_time': '09:00', 'end_time': '13:00'}
# 幂等的带宽降低函数(关键!)
def decrease_bandwidth(**context):
policy = fetch_active_policy()
policy_id = policy['id']
# 【关键】添加幂等检查:写入标记或查询当前带宽状态
# 此处以伪代码示意,生产环境建议使用 Airflow Variable / DB 记录 last_executed_ts 或 status
from airflow.models import Variable
last_decrease = Variable.get(f"bandwidth_decrease_{policy_id}", default_var=None)
if last_decrease:
# 若今日已执行,跳过(或校验是否为今天)
from datetime import datetime
if pendulum.parse(last_decrease).date() == pendulum.today().date():
print(f"[SKIP] Bandwidth decrease already executed today for policy {policy_id}")
return
# 执行真实操作(如调用网管 API)
print(f"[EXEC] Decreasing bandwidth for policy {policy_id} at {pendulum.now()}")
# your_network_api.set_bandwidth_limit(policy_id, "20Mbps", "5Mbps")
# 记录执行时间(保障幂等)
Variable.set(f"bandwidth_decrease_{policy_id}", pendulum.now().isoformat())
# 幂等的带宽恢复函数
def return_to_normal_bandwidth(**context):
policy = fetch_active_policy()
policy_id = policy['id']
from airflow.models import Variable
last_restore = Variable.get(f"bandwidth_restore_{policy_id}", default_var=None)
if last_restore:
if pendulum.parse(last_restore).date() == pendulum.today().date():
print(f"[SKIP] Bandwidth restore already executed today for policy {policy_id}")
return
print(f"[EXEC] Restoring bandwidth for policy {policy_id} at {pendulum.now()}")
# your_network_api.set_bandwidth_limit(policy_id, "20Mbps", "20Mbps")
Variable.set(f"bandwidth_restore_{policy_id}", pendulum.now().isoformat())
# === DAG 1:高峰开始时执行限速 ===
dag_decrease = DAG(
dag_id="bandwidth_decrease_on_peak_start",
schedule="0 9 * * *", # 每日 09:00 UTC(请按实际时区调整)
start_date=days_ago(1),
catchup=False,
tags=["network", "bandwidth", "isp"],
timezone="Europe/Istanbul", # ⚠️ 必须显式设置时区!
)
decrease_task = PythonOperator(
task_id="decrease_bandwidth",
python_callable=decrease_bandwidth,
dag=dag_decrease,
)
# === DAG 2:高峰结束时恢复带宽 ===
dag_restore = DAG(
dag_id="bandwidth_restore_on_peak_end",
schedule="0 13 * * *", # 每日 13:00 UTC
start_date=days_ago(1),
catchup=False,
tags=["network", "bandwidth", "isp"],
timezone="Europe/Istanbul",
)
restore_task = PythonOperator(
task_id="restore_bandwidth",
python_callable=return_to_normal_bandwidth,
dag=dag_restore,
)? 关键设计说明
- 精准调度替代运行时判断:schedule="0 9 * * *" 确保任务在每天 09:00(指定时区)准时触发一次,无需轮询或条件分支,语义清晰、资源开销低。
- 幂等性保障:通过 Airflow Variable 记录当日执行时间戳,每次运行前校验,避免重复操作。也可替换为数据库状态表、Redis 锁等更健壮方案。
- 时区安全:务必在 DAG 级别显式声明 timezone(如 "Europe/Istanbul"),避免因 Airflow 默认 UTC 导致调度偏差。
- 失败可重试:默认 retries=1,若首次失败,Airflow 将在 retry_delay 后重试,且幂等逻辑保证重试不会引发副作用。
- 解耦清晰:两个 DAG 完全独立,便于单独启停、监控、调试,也支持未来扩展多时段策略(如午休高峰、晚间高峰)。
⚠️ 注意事项
- Airflow Variable 适用于轻量状态记录,高并发或强一致性场景建议使用外部数据库(如 PostgreSQL 表 bandwidth_policy_log)并加唯一约束(policy_id + date)。
- 避免在 PythonOperator 中执行长时间阻塞操作;如带宽调整 API 响应慢,应封装为异步任务或增加超时与重试策略。
- 生产环境请启用 email_on_failure 并接入告警系统,确保策略变更失败时及时人工介入。
- 若策略动态变化(如高峰时段每日不同),可将 schedule 改为 @hourly,并在任务内增加「是否到达今日高峰起点/终点」的实时判断(仍需配合幂等存储)。
通过该双 DAG 设计,您将获得一个简洁、可靠、可维护的带宽策略调度系统——既符合 Airflow 的声明式哲学,又完美契合网络运维对时效性与准确性的严苛要求。










