Airflow FileSensor
介绍
在 Apache Airflow 中,Sensor 是一种特殊类型的任务,用于等待某些外部条件满足后再继续执行后续任务。FileSensor 是 Airflow 提供的一种传感器,用于监控文件系统中的文件是否存在。当文件存在时,FileSensor 会触发后续任务的执行。
FileSensor 特别适用于需要等待某个文件生成后再进行处理的场景,例如数据管道中等待输入文件、日志文件生成等。
FileSensor 的基本用法
FileSensor 的核心功能是监控指定路径下的文件是否存在。如果文件存在,任务将成功完成并触发后续任务;如果文件不存在,任务将等待一段时间后再次检查。
安装 FileSensor
FileSensor 是 Airflow 的内置传感器,因此不需要额外安装。只需确保你已经安装了 Apache Airflow。
示例代码
以下是一个简单的 FileSensor 示例,用于监控 /tmp/
目录下的 example.txt
文件是否存在:
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.operators.dummy import DummyOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
}
with DAG('file_sensor_example', default_args=default_args, schedule_interval=None) as dag:
wait_for_file = FileSensor(
task_id='wait_for_file',
filepath='/tmp/example.txt',
poke_interval=30, # 每30秒检查一次
timeout=300, # 最多等待300秒
)
process_file = DummyOperator(task_id='process_file')
wait_for_file >> process_file
在这个示例中,FileSensor
任务会每 30 秒检查一次 /tmp/example.txt
文件是否存在。如果文件在 300 秒内出现,任务将成功完成,并触发 process_file
任务。如果文件在 300 秒内未出现,任务将失败。
参数说明
filepath
: 要监控的文件路径。poke_interval
: 每次检查文件的时间间隔(以秒为单位)。timeout
: 任务的最大等待时间(以秒为单位)。如果超过此时间文件仍未出现,任务将失败。
实际应用场景
场景 1: 数据管道中的文件等待
假设你有一个数据管道,需要等待上游系统生成一个 CSV 文件后再进行处理。你可以使用 FileSensor 来监控该文件是否生成,并在文件生成后触发数据处理任务。
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.operators.python import PythonOperator
from datetime import datetime
def process_data():
print("Processing data...")
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
}
with DAG('data_pipeline', default_args=default_args, schedule_interval=None) as dag:
wait_for_csv = FileSensor(
task_id='wait_for_csv',
filepath='/data/input.csv',
poke_interval=60,
timeout=600,
)
process_csv = PythonOperator(
task_id='process_csv',
python_callable=process_data,
)
wait_for_csv >> process_csv
在这个场景中,FileSensor 会监控 /data/input.csv
文件是否存在。一旦文件生成,process_csv
任务将被触发,执行数据处理逻辑。
场景 2: 日志文件监控
假设你有一个日志文件生成系统,需要等待日志文件生成后再进行日志分析。你可以使用 FileSensor 来监控日志文件是否生成,并在文件生成后触发日志分析任务。
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.operators.bash import BashOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
}
with DAG('log_analysis', default_args=default_args, schedule_interval=None) as dag:
wait_for_log = FileSensor(
task_id='wait_for_log',
filepath='/logs/app.log',
poke_interval=30,
timeout=300,
)
analyze_log = BashOperator(
task_id='analyze_log',
bash_command='python /scripts/analyze_log.py',
)
wait_for_log >> analyze_log
在这个场景中,FileSensor 会监控 /logs/app.log
文件是否存在。一旦日志文件生成,analyze_log
任务将被触发,执行日志分析脚本。
总结
FileSensor 是 Apache Airflow 中一个非常有用的工具,特别适用于需要等待文件生成后再进行处理的场景。通过合理设置 poke_interval
和 timeout
参数,你可以灵活控制任务的等待时间和检查频率。
在实际应用中,FileSensor 可以用于数据管道、日志监控、文件处理等多种场景。希望本文能帮助你理解并掌握 FileSensor 的基本用法。
附加资源
练习
- 创建一个 DAG,使用 FileSensor 监控
/var/log/syslog
文件是否存在,并在文件生成后触发一个打印日志内容的 Bash 任务。 - 修改上述示例中的
poke_interval
和timeout
参数,观察任务的行为变化。