跳到主要内容

Airflow 与GCP集成

介绍

Apache Airflow 是一个开源的工作流管理平台,用于编排和调度复杂的数据管道。Google Cloud Platform(GCP)提供了一系列强大的云服务,如BigQuery、Cloud Storage、Cloud Composer等。将Airflow与GCP集成,可以帮助你自动化和管理基于GCP的工作流,从而更高效地处理数据。

在本教程中,我们将逐步介绍如何将Airflow与GCP集成,并通过实际案例展示其应用场景。

前提条件

在开始之前,请确保你已经具备以下条件:

  1. 一个GCP账户,并已启用所需的API(如BigQuery、Cloud Storage等)。
  2. 已安装并配置好Apache Airflow。
  3. 已安装 apache-airflow-providers-google 包,该包提供了与GCP集成的功能。

安装和配置

首先,你需要安装 apache-airflow-providers-google 包。你可以使用以下命令进行安装:

bash
pip install apache-airflow-providers-google

安装完成后,你需要在Airflow中配置GCP的连接。你可以通过Airflow的Web UI或直接编辑 airflow.cfg 文件来完成配置。

通过Web UI配置GCP连接

  1. 打开Airflow的Web UI。
  2. 导航到 Admin > Connections
  3. 点击 Create 按钮。
  4. Conn Id 字段中输入 google_cloud_default
  5. Conn Type 字段中选择 Google Cloud Platform
  6. Keyfile JSON 字段中粘贴你的GCP服务账户密钥文件内容。
  7. 点击 Save 按钮。

通过 airflow.cfg 配置GCP连接

你可以在 airflow.cfg 文件中添加以下配置:

ini
[google]
key_path = /path/to/your/service-account-key.json

使用Airflow与GCP集成

1. 使用BigQuery

BigQuery是GCP提供的一个强大的数据仓库服务。你可以使用Airflow的 BigQueryOperator 来执行SQL查询并将结果存储到BigQuery中。

以下是一个简单的DAG示例,展示了如何使用 BigQueryOperator

python
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryOperator
from airflow.utils.dates import days_ago

default_args = {
'start_date': days_ago(1),
}

with DAG('bigquery_example', default_args=default_args, schedule_interval='@daily') as dag:
run_query = BigQueryOperator(
task_id='run_query',
sql='SELECT * FROM `your_project.your_dataset.your_table`',
use_legacy_sql=False,
destination_dataset_table='your_project.your_dataset.your_result_table',
write_disposition='WRITE_TRUNCATE',
)

2. 使用Cloud Storage

Cloud Storage是GCP提供的对象存储服务。你可以使用Airflow的 GoogleCloudStorageToBigQueryOperator 将数据从Cloud Storage加载到BigQuery中。

以下是一个简单的DAG示例,展示了如何使用 GoogleCloudStorageToBigQueryOperator

python
from airflow import DAG
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GoogleCloudStorageToBigQueryOperator
from airflow.utils.dates import days_ago

default_args = {
'start_date': days_ago(1),
}

with DAG('gcs_to_bigquery_example', default_args=default_args, schedule_interval='@daily') as dag:
load_data = GoogleCloudStorageToBigQueryOperator(
task_id='load_data',
bucket='your_bucket',
source_objects=['your_file.csv'],
destination_project_dataset_table='your_project.your_dataset.your_table',
schema_fields=[
{'name': 'column1', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'column2', 'type': 'INTEGER', 'mode': 'NULLABLE'},
],
write_disposition='WRITE_TRUNCATE',
)

实际案例

假设你有一个每天生成的数据文件,存储在Cloud Storage中。你需要将这些数据加载到BigQuery中,并执行一些分析查询。你可以使用Airflow来自动化这个过程。

  1. 使用 GoogleCloudStorageToBigQueryOperator 将数据从Cloud Storage加载到BigQuery。
  2. 使用 BigQueryOperator 执行分析查询并将结果存储到另一个BigQuery表中。

总结

通过将Airflow与GCP集成,你可以轻松地自动化和管理基于GCP的工作流。无论是将数据从Cloud Storage加载到BigQuery,还是执行复杂的SQL查询,Airflow都提供了强大的工具来简化这些任务。

附加资源

练习

  1. 创建一个DAG,将数据从Cloud Storage加载到BigQuery,并执行一个SQL查询。
  2. 尝试使用 BigQueryOperator 创建一个新的BigQuery表,并将查询结果存储在其中。
提示

如果你在配置过程中遇到问题,可以参考Airflow和GCP的官方文档,或者查看相关的社区论坛。