Airflow ExternalTaskSensor
介绍
在 Apache Airflow 中,ExternalTaskSensor 是一种特殊的传感器(Sensor),用于监控外部任务(通常位于另一个 DAG 中)的完成状态。它可以帮助你实现跨 DAG 的依赖管理,确保在某个任务开始之前,另一个 DAG 中的特定任务已经成功完成。
ExternalTaskSensor 的核心功能是等待,它会定期检查外部任务的状态,直到该任务成功完成或达到超时时间。这种机制在复杂的任务调度场景中非常有用,尤其是在多个 DAG 之间存在依赖关系时。
基本用法
ExternalTaskSensor 的基本用法非常简单。你需要指定以下参数:
external_dag_id
: 外部任务所在的 DAG ID。external_task_id
: 外部任务的 ID。execution_date_fn
(可选): 用于确定外部任务的执行日期的函数。
以下是一个简单的代码示例:
from airflow import DAG
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
}
with DAG('example_external_task_sensor', default_args=default_args, schedule_interval='@daily') as dag:
wait_for_external_task = ExternalTaskSensor(
task_id='wait_for_external_task',
external_dag_id='external_dag',
external_task_id='external_task',
mode='reschedule',
)
start_task = DummyOperator(task_id='start_task')
wait_for_external_task >> start_task
在这个示例中,wait_for_external_task
传感器会等待 external_dag
中的 external_task
任务完成。一旦该任务完成,start_task
任务就会开始执行。
参数详解
external_dag_id
这是外部任务所在的 DAG 的 ID。你需要确保这个 DAG 已经存在于 Airflow 中。
external_task_id
这是外部任务的 ID。你需要确保这个任务已经存在于指定的 DAG 中。
execution_date_fn
这是一个可选参数,用于指定一个函数来确定外部任务的执行日期。默认情况下,ExternalTaskSensor 会使用当前任务的执行日期来查找外部任务。
mode
ExternalTaskSensor 有两种模式:
poke
(默认): 传感器会定期检查外部任务的状态,直到任务完成或超时。reschedule
: 传感器会在每次检查后释放资源,并在下一次检查时重新调度。
实际应用场景
假设你有一个数据管道,其中包含两个 DAG:
- DAG A: 负责从外部 API 提取数据并存储到数据库中。
- DAG B: 负责对数据库中的数据进行处理和分析。
你希望确保在 DAG B 开始处理数据之前,DAG A 已经成功完成了数据提取任务。这时,你可以在 DAG B 中使用 ExternalTaskSensor 来监控 DAG A 中的任务。
with DAG('dag_b', default_args=default_args, schedule_interval='@daily') as dag_b:
wait_for_dag_a = ExternalTaskSensor(
task_id='wait_for_dag_a',
external_dag_id='dag_a',
external_task_id='extract_data',
mode='reschedule',
)
process_data = DummyOperator(task_id='process_data')
wait_for_dag_a >> process_data
在这个例子中,wait_for_dag_a
传感器会等待 dag_a
中的 extract_data
任务完成。一旦该任务完成,process_data
任务就会开始执行。
总结
ExternalTaskSensor 是 Apache Airflow 中一个非常有用的工具,可以帮助你实现跨 DAG 的依赖管理。通过监控外部任务的完成状态,你可以确保任务之间的依赖关系得到正确处理,从而提高数据管道的可靠性和稳定性。
附加资源
练习
- 创建一个包含两个 DAG 的 Airflow 项目,其中一个 DAG 使用 ExternalTaskSensor 来监控另一个 DAG 中的任务。
- 尝试使用
execution_date_fn
参数来指定外部任务的执行日期。 - 比较
poke
和reschedule
模式的区别,并分析它们在资源使用上的差异。