Airflow Sensors基础
介绍
在 Apache Airflow 中,Sensors 是一种特殊类型的 Operator,用于监控外部系统的状态变化。Sensors 会持续检查某个条件是否满足,直到条件为真或超时。它们通常用于等待某个外部事件的发生,例如文件到达、数据库记录更新或 API 调用完成。
Sensors 是 Airflow DAG 开发中的重要组成部分,特别是在需要与外部系统交互的场景中。通过 Sensors,你可以确保任务只在特定条件满足时执行,从而提高工作流的可靠性和效率。
Sensors 的工作原理
Sensors 的核心功能是轮询。它们会定期检查某个条件是否满足,如果条件满足,则任务成功完成;如果条件未满足,则继续等待。Sensors 的行为可以通过以下参数进行配置:
- poke_interval: 两次检查之间的时间间隔(默认值为 60 秒)。
- timeout: 传感器等待条件满足的最大时间(默认值为 7 天)。
- mode: 传感器的运行模式,可以是
poke
(默认)或reschedule
。
在 poke
模式下,传感器会持续占用工作线程,直到条件满足或超时。而在 reschedule
模式下,传感器会在每次检查后释放工作线程,适合长时间等待的场景。
常用 Sensors 类型
Airflow 提供了多种内置 Sensors,以下是一些常见的 Sensors:
- FileSensor: 用于监控文件系统中的文件是否存在。
- SqlSensor: 用于监控数据库查询结果是否满足特定条件。
- HttpSensor: 用于监控 HTTP 请求的响应是否满足特定条件。
- ExternalTaskSensor: 用于监控其他 DAG 中的任务是否完成。
示例:使用 FileSensor
以下是一个使用 FileSensor
的示例,该传感器会等待指定文件出现在文件系统中:
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.operators.dummy import DummyOperator
from datetime import datetime
with DAG(
dag_id="file_sensor_example",
start_date=datetime(2023, 1, 1),
schedule_interval="@daily",
) as dag:
wait_for_file = FileSensor(
task_id="wait_for_file",
filepath="/path/to/your/file.txt",
poke_interval=30, # 每 30 秒检查一次
timeout=3600, # 最多等待 1 小时
mode="poke",
)
process_file = DummyOperator(task_id="process_file")
wait_for_file >> process_file
在这个示例中,FileSensor
会每 30 秒检查一次 /path/to/your/file.txt
文件是否存在。如果文件在 1 小时内出现,任务 process_file
将会执行;否则,任务将失败。
实际应用场景
场景 1:等待数据文件到达
假设你有一个每日运行的 ETL 管道,需要等待上游系统生成的数据文件到达后才能开始处理。你可以使用 FileSensor
来监控数据文件的到达,并在文件到达后触发后续的 ETL 任务。
场景 2:监控数据库记录
假设你需要等待某个数据库表中的记录更新后才能继续执行任务。你可以使用 SqlSensor
来监控数据库表的变化,并在条件满足时触发后续任务。
from airflow.sensors.sql import SqlSensor
wait_for_record = SqlSensor(
task_id="wait_for_record",
conn_id="your_db_connection",
sql="SELECT COUNT(*) FROM your_table WHERE status = 'processed'",
poke_interval=60,
timeout=7200,
mode="reschedule",
)
在这个示例中,SqlSensor
会每 60 秒检查一次数据库表 your_table
中状态为 processed
的记录数量。如果记录数量大于 0,任务将成功完成。
总结
Sensors 是 Apache Airflow 中用于监控外部系统状态变化的强大工具。通过合理配置 Sensors,你可以确保任务只在特定条件满足时执行,从而提高工作流的可靠性和效率。本文介绍了 Sensors 的基本概念、常用类型以及实际应用场景,并提供了代码示例帮助你快速上手。
附加资源与练习
- 练习 1: 创建一个 DAG,使用
HttpSensor
监控某个 API 的响应状态码是否为 200。 - 练习 2: 修改
FileSensor
示例,使其在reschedule
模式下运行,并观察任务的行为变化。
更多关于 Sensors 的详细信息,请参考 Apache Airflow 官方文档.