跳到主要内容

Airflow FileSensor

介绍

在 Apache Airflow 中,Sensor 是一种特殊类型的任务,用于等待某些外部条件满足后再继续执行后续任务。FileSensor 是 Airflow 提供的一种传感器,用于监控文件系统中的文件是否存在。当文件存在时,FileSensor 会触发后续任务的执行。

FileSensor 特别适用于需要等待某个文件生成后再进行处理的场景,例如数据管道中等待输入文件、日志文件生成等。

FileSensor 的基本用法

FileSensor 的核心功能是监控指定路径下的文件是否存在。如果文件存在,任务将成功完成并触发后续任务;如果文件不存在,任务将等待一段时间后再次检查。

安装 FileSensor

FileSensor 是 Airflow 的内置传感器,因此不需要额外安装。只需确保你已经安装了 Apache Airflow。

示例代码

以下是一个简单的 FileSensor 示例,用于监控 /tmp/ 目录下的 example.txt 文件是否存在:

python
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.operators.dummy import DummyOperator
from datetime import datetime

default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
}

with DAG('file_sensor_example', default_args=default_args, schedule_interval=None) as dag:
wait_for_file = FileSensor(
task_id='wait_for_file',
filepath='/tmp/example.txt',
poke_interval=30, # 每30秒检查一次
timeout=300, # 最多等待300秒
)

process_file = DummyOperator(task_id='process_file')

wait_for_file >> process_file

在这个示例中,FileSensor 任务会每 30 秒检查一次 /tmp/example.txt 文件是否存在。如果文件在 300 秒内出现,任务将成功完成,并触发 process_file 任务。如果文件在 300 秒内未出现,任务将失败。

参数说明

  • filepath: 要监控的文件路径。
  • poke_interval: 每次检查文件的时间间隔(以秒为单位)。
  • timeout: 任务的最大等待时间(以秒为单位)。如果超过此时间文件仍未出现,任务将失败。

实际应用场景

场景 1: 数据管道中的文件等待

假设你有一个数据管道,需要等待上游系统生成一个 CSV 文件后再进行处理。你可以使用 FileSensor 来监控该文件是否生成,并在文件生成后触发数据处理任务。

python
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.operators.python import PythonOperator
from datetime import datetime

def process_data():
print("Processing data...")

default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
}

with DAG('data_pipeline', default_args=default_args, schedule_interval=None) as dag:
wait_for_csv = FileSensor(
task_id='wait_for_csv',
filepath='/data/input.csv',
poke_interval=60,
timeout=600,
)

process_csv = PythonOperator(
task_id='process_csv',
python_callable=process_data,
)

wait_for_csv >> process_csv

在这个场景中,FileSensor 会监控 /data/input.csv 文件是否存在。一旦文件生成,process_csv 任务将被触发,执行数据处理逻辑。

场景 2: 日志文件监控

假设你有一个日志文件生成系统,需要等待日志文件生成后再进行日志分析。你可以使用 FileSensor 来监控日志文件是否生成,并在文件生成后触发日志分析任务。

python
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.operators.bash import BashOperator
from datetime import datetime

default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
}

with DAG('log_analysis', default_args=default_args, schedule_interval=None) as dag:
wait_for_log = FileSensor(
task_id='wait_for_log',
filepath='/logs/app.log',
poke_interval=30,
timeout=300,
)

analyze_log = BashOperator(
task_id='analyze_log',
bash_command='python /scripts/analyze_log.py',
)

wait_for_log >> analyze_log

在这个场景中,FileSensor 会监控 /logs/app.log 文件是否存在。一旦日志文件生成,analyze_log 任务将被触发,执行日志分析脚本。

总结

FileSensor 是 Apache Airflow 中一个非常有用的工具,特别适用于需要等待文件生成后再进行处理的场景。通过合理设置 poke_intervaltimeout 参数,你可以灵活控制任务的等待时间和检查频率。

在实际应用中,FileSensor 可以用于数据管道、日志监控、文件处理等多种场景。希望本文能帮助你理解并掌握 FileSensor 的基本用法。

附加资源

练习

  1. 创建一个 DAG,使用 FileSensor 监控 /var/log/syslog 文件是否存在,并在文件生成后触发一个打印日志内容的 Bash 任务。
  2. 修改上述示例中的 poke_intervaltimeout 参数,观察任务的行为变化。