Airflow 与GCS交互
Apache Airflow 是一个强大的工作流管理工具,广泛用于数据管道的编排和调度。Google Cloud Storage(GCS)是Google Cloud Platform(GCP)提供的一种对象存储服务,常用于存储和管理大规模数据。将Airflow与GCS集成,可以帮助您自动化数据存储、传输和处理任务。
本文将逐步介绍如何在Airflow中与GCS进行交互,包括如何上传、下载和管理GCS中的文件。我们还将通过实际案例展示这些操作的真实应用场景。
1. 准备工作
在开始之前,您需要完成以下准备工作:
- GCP项目:确保您已经创建了一个GCP项目,并启用了Google Cloud Storage API。
- 服务账户:创建一个GCP服务账户,并为其分配适当的权限(例如
Storage Admin
角色)。 - 密钥文件:下载服务账户的JSON密钥文件,并将其存储在安全的位置。
- Airflow环境:确保您的Airflow环境已经安装并配置好。
2. 安装依赖
为了在Airflow中与GCS交互,您需要安装apache-airflow-providers-google
包。该包提供了与GCP服务集成的操作符和钩子。
pip install apache-airflow-providers-google
3. 配置GCP连接
在Airflow中,您需要配置一个GCP连接,以便与GCS进行交互。您可以通过Airflow的Web UI或直接编辑airflow.cfg
文件来完成此操作。
通过Web UI配置
- 登录Airflow Web UI。
- 导航到
Admin
>Connections
。 - 点击
+
按钮创建一个新连接。 - 填写以下字段:
- Conn Id:
google_cloud_default
- Conn Type:
Google Cloud
- Keyfile Path: 填写服务账户密钥文件的路径(例如
/path/to/your/keyfile.json
)
- Conn Id:
通过配置文件配置
您也可以在airflow.cfg
文件中直接配置GCP连接:
[connections]
google_cloud_default = google-cloud-platform://?extra__google_cloud_platform__key_path=/path/to/your/keyfile.json
4. 使用GCSHook与GCS交互
Airflow提供了GCSHook
,这是一个用于与GCS交互的钩子。您可以使用它来执行各种操作,例如上传、下载和删除文件。
上传文件到GCS
以下是一个使用GCSHook
上传文件到GCS的示例:
from airflow import DAG
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def upload_to_gcs():
hook = GCSHook()
bucket_name = 'your-bucket-name'
object_name = 'your-object-name'
filename = '/path/to/your/local/file.txt'
hook.upload(bucket_name, object_name, filename)
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
}
with DAG('gcs_upload_example', default_args=default_args, schedule_interval=None) as dag:
upload_task = PythonOperator(
task_id='upload_to_gcs',
python_callable=upload_to_gcs,
)
从GCS下载文件
以下是一个使用GCSHook
从GCS下载文件的示例:
def download_from_gcs():
hook = GCSHook()
bucket_name = 'your-bucket-name'
object_name = 'your-object-name'
filename = '/path/to/your/local/destination/file.txt'
hook.download(bucket_name, object_name, filename)
with DAG('gcs_download_example', default_args=default_args, schedule_interval=None) as dag:
download_task = PythonOperator(
task_id='download_from_gcs',
python_callable=download_from_gcs,
)
删除GCS中的文件
以下是一个使用GCSHook
删除GCS中文件的示例:
def delete_from_gcs():
hook = GCSHook()
bucket_name = 'your-bucket-name'
object_name = 'your-object-name'
hook.delete(bucket_name, object_name)
with DAG('gcs_delete_example', default_args=default_args, schedule_interval=None) as dag:
delete_task = PythonOperator(
task_id='delete_from_gcs',
python_callable=delete_from_gcs,
)
5. 实际案例
假设您有一个数据管道,需要每天从本地系统上传日志文件到GCS,并在上传完成后删除本地文件。以下是一个完整的DAG示例:
from airflow import DAG
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
def upload_logs_to_gcs():
hook = GCSHook()
bucket_name = 'your-bucket-name'
object_name = 'logs/logfile_' + datetime.now().strftime('%Y%m%d') + '.txt'
filename = '/path/to/your/local/logfile.txt'
hook.upload(bucket_name, object_name, filename)
def delete_local_logs():
import os
os.remove('/path/to/your/local/logfile.txt')
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
}
with DAG('daily_log_upload', default_args=default_args, schedule_interval='@daily') as dag:
upload_task = PythonOperator(
task_id='upload_logs_to_gcs',
python_callable=upload_logs_to_gcs,
)
delete_task = PythonOperator(
task_id='delete_local_logs',
python_callable=delete_local_logs,
)
upload_task >> delete_task
6. 总结
通过本文,您已经学习了如何在Apache Airflow中与Google Cloud Storage(GCS)进行交互。我们介绍了如何配置GCP连接、使用GCSHook
上传、下载和删除文件,并通过一个实际案例展示了这些操作的应用场景。
7. 附加资源与练习
- 官方文档:阅读Airflow官方文档以了解更多关于Airflow的详细信息。
- GCS文档:查看Google Cloud Storage文档以深入了解GCS的功能和最佳实践。
- 练习:尝试创建一个DAG,每天从GCS下载一个文件,处理后再上传回GCS。
如果您在配置或使用过程中遇到问题,可以参考Airflow和GCS的官方文档,或者在社区论坛中寻求帮助。