Airflow S3KeySensor
什么是 S3KeySensor?
在 Apache Airflow 中,S3KeySensor 是一种传感器(Sensor),用于监控 Amazon S3 存储桶中是否存在指定的文件或键(Key)。传感器是 Airflow 中的一种特殊任务,它们会持续检查某个条件是否满足,直到条件为真或超时为止。S3KeySensor 特别适用于需要等待某个文件在 S3 中生成后再继续执行后续任务的场景。
提示
传感器通常用于工作流中需要等待外部事件触发的场景,例如等待文件上传、数据库更新或 API 响应。
S3KeySensor 的工作原理
S3KeySensor 会定期检查 S3 存储桶中是否存在指定的键(文件路径)。如果文件存在,传感器任务将成功完成,并允许后续任务继续执行。如果文件不存在,传感器会继续等待,直到文件出现或达到超时时间。
主要参数
- bucket_name: 要监控的 S3 存储桶名称。
- bucket_key: 要监控的文件键(路径)。
- aws_conn_id: 用于连接 AWS 的 Airflow 连接 ID。
- timeout: 传感器等待文件的最大时间(以秒为单位)。
- poke_interval: 每次检查文件的时间间隔(以秒为单位)。
如何使用 S3KeySensor
以下是一个简单的示例,展示如何在 Airflow DAG 中使用 S3KeySensor。
示例代码
python
from airflow import DAG
from airflow.providers.amazon.aws.sensors.s3_key import S3KeySensor
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
}
with DAG(
's3_key_sensor_example',
default_args=default_args,
schedule_interval=None,
) as dag:
start_task = DummyOperator(task_id='start_task')
# 监控 S3 存储桶中的文件
s3_sensor = S3KeySensor(
task_id='s3_key_sensor',
bucket_name='my-s3-bucket',
bucket_key='path/to/my-file.txt',
aws_conn_id='aws_default',
timeout=600, # 10 分钟超时
poke_interval=60, # 每分钟检查一次
)
end_task = DummyOperator(task_id='end_task')
start_task >> s3_sensor >> end_task
代码解释
- DAG 定义: 创建了一个名为
s3_key_sensor_example
的 DAG,并设置了默认参数。 - S3KeySensor: 定义了一个 S3KeySensor 任务,监控 S3 存储桶
my-s3-bucket
中的文件path/to/my-file.txt
。 - 超时和检查间隔: 设置传感器最多等待 10 分钟(
timeout=600
),并每分钟检查一次(poke_interval=60
)。 - 任务依赖: 使用
>>
运算符定义任务之间的依赖关系。
实际应用场景
假设你有一个数据管道,需要等待上游系统将数据文件上传到 S3 存储桶中,然后才能开始处理数据。你可以使用 S3KeySensor 来监控文件是否已上传,并在文件出现后触发后续的数据处理任务。
场景示例
- 文件上传: 上游系统将文件
data.csv
上传到 S3 存储桶my-data-bucket
的uploads/
目录。 - 监控文件: 使用 S3KeySensor 监控
uploads/data.csv
文件。 - 触发任务: 文件上传后,传感器任务成功,触发后续的数据处理任务。
总结
S3KeySensor 是 Apache Airflow 中一个强大的工具,用于监控 Amazon S3 存储桶中的文件。它可以帮助你构建依赖外部事件的工作流,例如等待文件上传或数据生成。通过合理设置超时和检查间隔,你可以确保工作流在文件出现后及时触发后续任务。
备注
如果你需要监控多个文件或更复杂的条件,可以考虑使用 Airflow 的其他传感器或自定义传感器。
附加资源与练习
- 练习: 尝试修改示例代码,监控 S3 存储桶中的多个文件,并在所有文件都出现后触发任务。
- 资源: 阅读 Airflow 官方文档 中关于传感器的更多内容。
- 扩展: 探索如何使用 Airflow 的其他传感器,例如
HttpSensor
或SqlSensor
,以满足不同的监控需求。