跳到主要内容

Airflow Sensor模式

什么是 Airflow Sensor 模式?

在 Apache Airflow 中,Sensor 模式是一种特殊的任务类型,用于监控外部系统的状态或条件。Sensor 会持续检查某个条件是否满足,直到条件为真或达到超时时间。一旦条件满足,Sensor 任务就会成功,并允许后续任务继续执行。

Sensor 模式的核心思想是等待。它适用于需要依赖外部系统状态的任务,例如等待文件生成、数据库记录更新或 API 响应完成。

备注

Sensor 是 Airflow 中一种阻塞任务,因为它会持续运行直到条件满足或超时。


Sensor 的工作原理

Sensor 通过以下步骤工作:

  1. 定义条件:Sensor 任务会定义一个需要监控的条件,例如文件是否存在、数据库记录是否更新等。
  2. 轮询检查:Sensor 会定期(通过 poke_interval 参数控制)检查条件是否满足。
  3. 超时处理:如果条件在指定时间内(通过 timeout 参数控制)未满足,Sensor 任务会失败。
  4. 触发后续任务:一旦条件满足,Sensor 任务成功,后续任务可以继续执行。

常用 Sensor 类型

Airflow 提供了多种内置 Sensor,以下是几种常见的 Sensor:

  1. FileSensor:监控文件系统中某个文件是否存在。
  2. SqlSensor:监控数据库中某个查询结果是否满足条件。
  3. ExternalTaskSensor:监控另一个 DAG 中的任务是否完成。
  4. HttpSensor:监控某个 HTTP 端点是否返回预期响应。

代码示例:FileSensor 的使用

以下是一个使用 FileSensor 的示例,它会监控 /tmp/my_file.txt 文件是否存在:

python
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.operators.dummy import DummyOperator
from datetime import datetime

default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
}

with DAG('file_sensor_example', default_args=default_args, schedule_interval=None) as dag:
start_task = DummyOperator(task_id='start_task')

# 定义 FileSensor,监控 /tmp/my_file.txt 文件是否存在
file_sensor_task = FileSensor(
task_id='file_sensor_task',
filepath='/tmp/my_file.txt',
poke_interval=30, # 每 30 秒检查一次
timeout=300, # 最多等待 5 分钟
)

end_task = DummyOperator(task_id='end_task')

start_task >> file_sensor_task >> end_task

输入和输出

  • 输入:Sensor 会监控 /tmp/my_file.txt 文件是否存在。
  • 输出
    • 如果文件在 5 分钟内生成,Sensor 任务成功,end_task 会执行。
    • 如果文件未生成,Sensor 任务失败。

实际应用场景

场景 1:等待数据文件生成

假设你有一个数据处理任务,需要等待上游系统生成一个数据文件。你可以使用 FileSensor 监控文件是否生成,并在文件生成后触发后续任务。

场景 2:监控数据库记录

假设你需要等待数据库中的某条记录更新后再执行任务。你可以使用 SqlSensor 监控数据库查询结果,例如:

python
from airflow.sensors.sql import SqlSensor

sql_sensor_task = SqlSensor(
task_id='sql_sensor_task',
conn_id='my_db_connection',
sql="SELECT COUNT(*) FROM my_table WHERE status = 'completed'",
poke_interval=60,
timeout=600,
)

总结

  • Sensor 模式是 Airflow 中用于监控外部系统状态的任务类型。
  • 常用的 Sensor 包括 FileSensorSqlSensorExternalTaskSensorHttpSensor
  • Sensor 会持续检查条件是否满足,直到条件为真或超时。
  • 通过合理使用 Sensor,可以实现任务之间的依赖控制和外部系统状态的监控。

附加资源与练习

资源

练习

  1. 创建一个 DAG,使用 HttpSensor 监控某个 API 端点是否返回 200 状态码。
  2. 修改 FileSensor 示例,使其监控多个文件的存在。
  3. 使用 ExternalTaskSensor 监控另一个 DAG 中的任务是否完成。

通过实践这些练习,你将更深入地理解 Sensor 模式的应用场景和实现方式。