跳到主要内容

Airflow Sensors基础

介绍

在 Apache Airflow 中,Sensors 是一种特殊类型的 Operator,用于监控外部系统的状态变化。Sensors 会持续检查某个条件是否满足,直到条件为真或超时。它们通常用于等待某个外部事件的发生,例如文件到达、数据库记录更新或 API 调用完成。

Sensors 是 Airflow DAG 开发中的重要组成部分,特别是在需要与外部系统交互的场景中。通过 Sensors,你可以确保任务只在特定条件满足时执行,从而提高工作流的可靠性和效率。

Sensors 的工作原理

Sensors 的核心功能是轮询。它们会定期检查某个条件是否满足,如果条件满足,则任务成功完成;如果条件未满足,则继续等待。Sensors 的行为可以通过以下参数进行配置:

  • poke_interval: 两次检查之间的时间间隔(默认值为 60 秒)。
  • timeout: 传感器等待条件满足的最大时间(默认值为 7 天)。
  • mode: 传感器的运行模式,可以是 poke(默认)或 reschedule
提示

poke 模式下,传感器会持续占用工作线程,直到条件满足或超时。而在 reschedule 模式下,传感器会在每次检查后释放工作线程,适合长时间等待的场景。

常用 Sensors 类型

Airflow 提供了多种内置 Sensors,以下是一些常见的 Sensors:

  1. FileSensor: 用于监控文件系统中的文件是否存在。
  2. SqlSensor: 用于监控数据库查询结果是否满足特定条件。
  3. HttpSensor: 用于监控 HTTP 请求的响应是否满足特定条件。
  4. ExternalTaskSensor: 用于监控其他 DAG 中的任务是否完成。

示例:使用 FileSensor

以下是一个使用 FileSensor 的示例,该传感器会等待指定文件出现在文件系统中:

python
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.operators.dummy import DummyOperator
from datetime import datetime

with DAG(
dag_id="file_sensor_example",
start_date=datetime(2023, 1, 1),
schedule_interval="@daily",
) as dag:

wait_for_file = FileSensor(
task_id="wait_for_file",
filepath="/path/to/your/file.txt",
poke_interval=30, # 每 30 秒检查一次
timeout=3600, # 最多等待 1 小时
mode="poke",
)

process_file = DummyOperator(task_id="process_file")

wait_for_file >> process_file

在这个示例中,FileSensor 会每 30 秒检查一次 /path/to/your/file.txt 文件是否存在。如果文件在 1 小时内出现,任务 process_file 将会执行;否则,任务将失败。

实际应用场景

场景 1:等待数据文件到达

假设你有一个每日运行的 ETL 管道,需要等待上游系统生成的数据文件到达后才能开始处理。你可以使用 FileSensor 来监控数据文件的到达,并在文件到达后触发后续的 ETL 任务。

场景 2:监控数据库记录

假设你需要等待某个数据库表中的记录更新后才能继续执行任务。你可以使用 SqlSensor 来监控数据库表的变化,并在条件满足时触发后续任务。

python
from airflow.sensors.sql import SqlSensor

wait_for_record = SqlSensor(
task_id="wait_for_record",
conn_id="your_db_connection",
sql="SELECT COUNT(*) FROM your_table WHERE status = 'processed'",
poke_interval=60,
timeout=7200,
mode="reschedule",
)

在这个示例中,SqlSensor 会每 60 秒检查一次数据库表 your_table 中状态为 processed 的记录数量。如果记录数量大于 0,任务将成功完成。

总结

Sensors 是 Apache Airflow 中用于监控外部系统状态变化的强大工具。通过合理配置 Sensors,你可以确保任务只在特定条件满足时执行,从而提高工作流的可靠性和效率。本文介绍了 Sensors 的基本概念、常用类型以及实际应用场景,并提供了代码示例帮助你快速上手。

附加资源与练习

  • 练习 1: 创建一个 DAG,使用 HttpSensor 监控某个 API 的响应状态码是否为 200。
  • 练习 2: 修改 FileSensor 示例,使其在 reschedule 模式下运行,并观察任务的行为变化。
备注

更多关于 Sensors 的详细信息,请参考 Apache Airflow 官方文档.