跳到主要内容

Airflow S3KeySensor

什么是 S3KeySensor?

在 Apache Airflow 中,S3KeySensor 是一种传感器(Sensor),用于监控 Amazon S3 存储桶中是否存在指定的文件或键(Key)。传感器是 Airflow 中的一种特殊任务,它们会持续检查某个条件是否满足,直到条件为真或超时为止。S3KeySensor 特别适用于需要等待某个文件在 S3 中生成后再继续执行后续任务的场景。

提示

传感器通常用于工作流中需要等待外部事件触发的场景,例如等待文件上传、数据库更新或 API 响应。

S3KeySensor 的工作原理

S3KeySensor 会定期检查 S3 存储桶中是否存在指定的键(文件路径)。如果文件存在,传感器任务将成功完成,并允许后续任务继续执行。如果文件不存在,传感器会继续等待,直到文件出现或达到超时时间。

主要参数

  • bucket_name: 要监控的 S3 存储桶名称。
  • bucket_key: 要监控的文件键(路径)。
  • aws_conn_id: 用于连接 AWS 的 Airflow 连接 ID。
  • timeout: 传感器等待文件的最大时间(以秒为单位)。
  • poke_interval: 每次检查文件的时间间隔(以秒为单位)。

如何使用 S3KeySensor

以下是一个简单的示例,展示如何在 Airflow DAG 中使用 S3KeySensor。

示例代码

python
from airflow import DAG
from airflow.providers.amazon.aws.sensors.s3_key import S3KeySensor
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
}

with DAG(
's3_key_sensor_example',
default_args=default_args,
schedule_interval=None,
) as dag:

start_task = DummyOperator(task_id='start_task')

# 监控 S3 存储桶中的文件
s3_sensor = S3KeySensor(
task_id='s3_key_sensor',
bucket_name='my-s3-bucket',
bucket_key='path/to/my-file.txt',
aws_conn_id='aws_default',
timeout=600, # 10 分钟超时
poke_interval=60, # 每分钟检查一次
)

end_task = DummyOperator(task_id='end_task')

start_task >> s3_sensor >> end_task

代码解释

  1. DAG 定义: 创建了一个名为 s3_key_sensor_example 的 DAG,并设置了默认参数。
  2. S3KeySensor: 定义了一个 S3KeySensor 任务,监控 S3 存储桶 my-s3-bucket 中的文件 path/to/my-file.txt
  3. 超时和检查间隔: 设置传感器最多等待 10 分钟(timeout=600),并每分钟检查一次(poke_interval=60)。
  4. 任务依赖: 使用 >> 运算符定义任务之间的依赖关系。

实际应用场景

假设你有一个数据管道,需要等待上游系统将数据文件上传到 S3 存储桶中,然后才能开始处理数据。你可以使用 S3KeySensor 来监控文件是否已上传,并在文件出现后触发后续的数据处理任务。

场景示例

  1. 文件上传: 上游系统将文件 data.csv 上传到 S3 存储桶 my-data-bucketuploads/ 目录。
  2. 监控文件: 使用 S3KeySensor 监控 uploads/data.csv 文件。
  3. 触发任务: 文件上传后,传感器任务成功,触发后续的数据处理任务。

总结

S3KeySensor 是 Apache Airflow 中一个强大的工具,用于监控 Amazon S3 存储桶中的文件。它可以帮助你构建依赖外部事件的工作流,例如等待文件上传或数据生成。通过合理设置超时和检查间隔,你可以确保工作流在文件出现后及时触发后续任务。

备注

如果你需要监控多个文件或更复杂的条件,可以考虑使用 Airflow 的其他传感器或自定义传感器。

附加资源与练习

  • 练习: 尝试修改示例代码,监控 S3 存储桶中的多个文件,并在所有文件都出现后触发任务。
  • 资源: 阅读 Airflow 官方文档 中关于传感器的更多内容。
  • 扩展: 探索如何使用 Airflow 的其他传感器,例如 HttpSensorSqlSensor,以满足不同的监控需求。