Airflow Sensor模式
什么是 Airflow Sensor 模式?
在 Apache Airflow 中,Sensor 模式是一种特殊的任务类型,用于监控外部系统的状态或条件。Sensor 会持续检查某个条件是否满足,直到条件为真或达到超时时间。一旦条件满足,Sensor 任务就会成功,并允许后续任务继续执行。
Sensor 模式的核心思想是等待。它适用于需要依赖外部系统状态的任务,例如等待文件生成、数据库记录更新或 API 响应完成。
备注
Sensor 是 Airflow 中一种阻塞任务,因为它会持续运行直到条件满足或超时。
Sensor 的工作原理
Sensor 通过以下步骤工作:
- 定义条件:Sensor 任务会定义一个需要监控的条件,例如文件是否存在、数据库记录是否更新等。
- 轮询检查:Sensor 会定期(通过
poke_interval
参数控制)检查条件是否满足。 - 超时处理:如果条件在指定时间内(通过
timeout
参数控制)未满足,Sensor 任务会失败。 - 触发后续任务:一旦条件满足,Sensor 任务成功,后续任务可以继续执行。
常用 Sensor 类型
Airflow 提供了多种内置 Sensor,以下是几种常见的 Sensor:
- FileSensor:监控文件系统中某个文件是否存在。
- SqlSensor:监控数据库中某个查询结果是否满足条件。
- ExternalTaskSensor:监控另一个 DAG 中的任务是否完成。
- HttpSensor:监控某个 HTTP 端点是否返回预期响应。
代码示例:FileSensor 的使用
以下是一个使用 FileSensor
的示例,它会监控 /tmp/my_file.txt
文件是否存在:
python
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.operators.dummy import DummyOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
}
with DAG('file_sensor_example', default_args=default_args, schedule_interval=None) as dag:
start_task = DummyOperator(task_id='start_task')
# 定义 FileSensor,监控 /tmp/my_file.txt 文件是否存在
file_sensor_task = FileSensor(
task_id='file_sensor_task',
filepath='/tmp/my_file.txt',
poke_interval=30, # 每 30 秒检查一次
timeout=300, # 最多等待 5 分钟
)
end_task = DummyOperator(task_id='end_task')
start_task >> file_sensor_task >> end_task
输入和输出
- 输入:Sensor 会监控
/tmp/my_file.txt
文件是否存在。 - 输出:
- 如果文件在 5 分钟内生成,Sensor 任务成功,
end_task
会执行。 - 如果文件未生成,Sensor 任务失败。
- 如果文件在 5 分钟内生成,Sensor 任务成功,
实际应用场景
场景 1:等待数据文件生成
假设你有一个数据处理任务,需要等待上游系统生成一个数据文件。你可以使用 FileSensor
监控文件是否生成,并在文件生成后触发后续任务。
场景 2:监控数据库记录
假设你需要等待数据库中的某条记录更新后再执行任务。你可以使用 SqlSensor
监控数据库查询结果,例如:
python
from airflow.sensors.sql import SqlSensor
sql_sensor_task = SqlSensor(
task_id='sql_sensor_task',
conn_id='my_db_connection',
sql="SELECT COUNT(*) FROM my_table WHERE status = 'completed'",
poke_interval=60,
timeout=600,
)
总结
- Sensor 模式是 Airflow 中用于监控外部系统状态的任务类型。
- 常用的 Sensor 包括
FileSensor
、SqlSensor
、ExternalTaskSensor
和HttpSensor
。 - Sensor 会持续检查条件是否满足,直到条件为真或超时。
- 通过合理使用 Sensor,可以实现任务之间的依赖控制和外部系统状态的监控。
附加资源与练习
资源
练习
- 创建一个 DAG,使用
HttpSensor
监控某个 API 端点是否返回200
状态码。 - 修改
FileSensor
示例,使其监控多个文件的存在。 - 使用
ExternalTaskSensor
监控另一个 DAG 中的任务是否完成。
通过实践这些练习,你将更深入地理解 Sensor 模式的应用场景和实现方式。