跳到主要内容

Airflow 与GCS交互

Apache Airflow 是一个强大的工作流管理工具,广泛用于数据管道的编排和调度。Google Cloud Storage(GCS)是Google Cloud Platform(GCP)提供的一种对象存储服务,常用于存储和管理大规模数据。将Airflow与GCS集成,可以帮助您自动化数据存储、传输和处理任务。

本文将逐步介绍如何在Airflow中与GCS进行交互,包括如何上传、下载和管理GCS中的文件。我们还将通过实际案例展示这些操作的真实应用场景。

1. 准备工作

在开始之前,您需要完成以下准备工作:

  1. GCP项目:确保您已经创建了一个GCP项目,并启用了Google Cloud Storage API。
  2. 服务账户:创建一个GCP服务账户,并为其分配适当的权限(例如Storage Admin角色)。
  3. 密钥文件:下载服务账户的JSON密钥文件,并将其存储在安全的位置。
  4. Airflow环境:确保您的Airflow环境已经安装并配置好。

2. 安装依赖

为了在Airflow中与GCS交互,您需要安装apache-airflow-providers-google包。该包提供了与GCP服务集成的操作符和钩子。

bash
pip install apache-airflow-providers-google

3. 配置GCP连接

在Airflow中,您需要配置一个GCP连接,以便与GCS进行交互。您可以通过Airflow的Web UI或直接编辑airflow.cfg文件来完成此操作。

通过Web UI配置

  1. 登录Airflow Web UI。
  2. 导航到Admin > Connections
  3. 点击+按钮创建一个新连接。
  4. 填写以下字段:
    • Conn Id: google_cloud_default
    • Conn Type: Google Cloud
    • Keyfile Path: 填写服务账户密钥文件的路径(例如/path/to/your/keyfile.json

通过配置文件配置

您也可以在airflow.cfg文件中直接配置GCP连接:

ini
[connections]
google_cloud_default = google-cloud-platform://?extra__google_cloud_platform__key_path=/path/to/your/keyfile.json

4. 使用GCSHook与GCS交互

Airflow提供了GCSHook,这是一个用于与GCS交互的钩子。您可以使用它来执行各种操作,例如上传、下载和删除文件。

上传文件到GCS

以下是一个使用GCSHook上传文件到GCS的示例:

python
from airflow import DAG
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def upload_to_gcs():
hook = GCSHook()
bucket_name = 'your-bucket-name'
object_name = 'your-object-name'
filename = '/path/to/your/local/file.txt'
hook.upload(bucket_name, object_name, filename)

default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
}

with DAG('gcs_upload_example', default_args=default_args, schedule_interval=None) as dag:
upload_task = PythonOperator(
task_id='upload_to_gcs',
python_callable=upload_to_gcs,
)

从GCS下载文件

以下是一个使用GCSHook从GCS下载文件的示例:

python
def download_from_gcs():
hook = GCSHook()
bucket_name = 'your-bucket-name'
object_name = 'your-object-name'
filename = '/path/to/your/local/destination/file.txt'
hook.download(bucket_name, object_name, filename)

with DAG('gcs_download_example', default_args=default_args, schedule_interval=None) as dag:
download_task = PythonOperator(
task_id='download_from_gcs',
python_callable=download_from_gcs,
)

删除GCS中的文件

以下是一个使用GCSHook删除GCS中文件的示例:

python
def delete_from_gcs():
hook = GCSHook()
bucket_name = 'your-bucket-name'
object_name = 'your-object-name'
hook.delete(bucket_name, object_name)

with DAG('gcs_delete_example', default_args=default_args, schedule_interval=None) as dag:
delete_task = PythonOperator(
task_id='delete_from_gcs',
python_callable=delete_from_gcs,
)

5. 实际案例

假设您有一个数据管道,需要每天从本地系统上传日志文件到GCS,并在上传完成后删除本地文件。以下是一个完整的DAG示例:

python
from airflow import DAG
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime

def upload_logs_to_gcs():
hook = GCSHook()
bucket_name = 'your-bucket-name'
object_name = 'logs/logfile_' + datetime.now().strftime('%Y%m%d') + '.txt'
filename = '/path/to/your/local/logfile.txt'
hook.upload(bucket_name, object_name, filename)

def delete_local_logs():
import os
os.remove('/path/to/your/local/logfile.txt')

default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
}

with DAG('daily_log_upload', default_args=default_args, schedule_interval='@daily') as dag:
upload_task = PythonOperator(
task_id='upload_logs_to_gcs',
python_callable=upload_logs_to_gcs,
)

delete_task = PythonOperator(
task_id='delete_local_logs',
python_callable=delete_local_logs,
)

upload_task >> delete_task

6. 总结

通过本文,您已经学习了如何在Apache Airflow中与Google Cloud Storage(GCS)进行交互。我们介绍了如何配置GCP连接、使用GCSHook上传、下载和删除文件,并通过一个实际案例展示了这些操作的应用场景。

7. 附加资源与练习

  • 官方文档:阅读Airflow官方文档以了解更多关于Airflow的详细信息。
  • GCS文档:查看Google Cloud Storage文档以深入了解GCS的功能和最佳实践。
  • 练习:尝试创建一个DAG,每天从GCS下载一个文件,处理后再上传回GCS。
提示

如果您在配置或使用过程中遇到问题,可以参考Airflow和GCS的官方文档,或者在社区论坛中寻求帮助。