跳到主要内容

Airflow 与S3交互

Apache Airflow 是一个强大的工作流管理工具,广泛用于数据管道的编排和调度。Amazon S3(Simple Storage Service)是AWS提供的对象存储服务,常用于存储和检索大量数据。将Airflow与S3集成,可以轻松管理数据的上传、下载和处理任务。

本文将详细介绍如何在Airflow中与S3进行交互,包括配置、文件操作和实际应用场景。

1. 配置Airflow与S3的连接

在Airflow中与S3交互的第一步是配置S3连接。Airflow通过S3Hook来管理与S3的连接。首先,你需要在Airflow的Web UI中创建一个S3连接。

1.1 创建S3连接

  1. 打开Airflow的Web UI。
  2. 导航到 Admin > Connections
  3. 点击 Create 按钮。
  4. 填写连接信息:
    • Conn Id: my_s3_conn
    • Conn Type: Amazon S3
    • Extra: {"aws_access_key_id": "YOUR_ACCESS_KEY", "aws_secret_access_key": "YOUR_SECRET_KEY"}
备注

确保将 YOUR_ACCESS_KEYYOUR_SECRET_KEY 替换为你的AWS凭证。

1.2 使用S3Hook

在DAG中使用S3Hook来与S3进行交互。以下是一个简单的示例,展示如何使用S3Hook上传文件到S3:

python
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

def upload_to_s3():
hook = S3Hook(aws_conn_id='my_s3_conn')
hook.load_file(
filename='/path/to/local/file.txt',
key='s3://my-bucket/path/to/file.txt',
bucket_name='my-bucket',
replace=True
)
提示

replace=True 参数表示如果S3中已存在同名文件,则替换它。

2. 文件上传与下载

2.1 上传文件到S3

上传文件到S3是常见的操作。以下代码展示了如何将本地文件上传到S3:

python
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from datetime import datetime

def upload_to_s3():
hook = S3Hook(aws_conn_id='my_s3_conn')
hook.load_file(
filename='/path/to/local/file.txt',
key='s3://my-bucket/path/to/file.txt',
bucket_name='my-bucket',
replace=True
)

default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
}

with DAG('s3_upload_dag', default_args=default_args, schedule_interval='@daily') as dag:
upload_task = PythonOperator(
task_id='upload_to_s3',
python_callable=upload_to_s3
)

2.2 从S3下载文件

从S3下载文件同样简单。以下代码展示了如何从S3下载文件到本地:

python
def download_from_s3():
hook = S3Hook(aws_conn_id='my_s3_conn')
hook.download_file(
key='s3://my-bucket/path/to/file.txt',
bucket_name='my-bucket',
local_path='/path/to/local/file.txt'
)

3. 实际应用场景

3.1 数据管道中的S3集成

假设你有一个数据管道,需要从S3读取数据,进行处理后,再将结果写回S3。以下是一个简单的DAG示例:

python
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from datetime import datetime

def process_data():
hook = S3Hook(aws_conn_id='my_s3_conn')
# 从S3下载数据
input_data = hook.read_key(key='s3://my-bucket/input/data.csv', bucket_name='my-bucket')

# 处理数据
processed_data = input_data.upper() # 示例处理

# 将处理后的数据上传到S3
hook.load_string(
string_data=processed_data,
key='s3://my-bucket/output/processed_data.csv',
bucket_name='my-bucket',
replace=True
)

default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
}

with DAG('s3_data_pipeline', default_args=default_args, schedule_interval='@daily') as dag:
process_task = PythonOperator(
task_id='process_data',
python_callable=process_data
)
警告

在实际应用中,确保处理逻辑符合业务需求,并处理可能的异常情况。

4. 总结

通过本文,你学习了如何在Airflow中与S3进行交互,包括配置连接、上传和下载文件,以及在实际数据管道中的应用。Airflow与S3的集成为数据工程师提供了强大的工具,能够轻松管理大规模数据的存储和处理任务。

5. 附加资源与练习

继续探索Airflow与S3的更多功能,提升你的数据管道管理能力!