跳到主要内容

Airflow 与RDS交互

介绍

Apache Airflow 是一个强大的工作流管理工具,广泛用于数据管道的编排和调度。Amazon RDS(Relational Database Service)是一种托管的关系型数据库服务,支持多种数据库引擎,如 MySQL、PostgreSQL 和 SQL Server。将 Airflow 与 RDS 集成,可以帮助你自动化数据库任务,例如数据提取、转换和加载(ETL)操作。

在本教程中,我们将逐步介绍如何在 Airflow 中与 RDS 进行交互,包括如何建立连接、执行查询以及管理数据库任务。

前提条件

在开始之前,请确保你已经具备以下条件:

  • 一个运行中的 Airflow 实例。
  • 一个 Amazon RDS 实例,并已配置好数据库引擎(如 MySQL 或 PostgreSQL)。
  • 安装了 Airflow 的 apache-airflow-providers-amazon 包。

1. 配置 Airflow 连接

首先,我们需要在 Airflow 中配置一个连接,以便与 RDS 实例进行通信。

  1. 打开 Airflow Web UI,导航到 Admin > Connections
  2. 点击 Create 按钮,创建一个新的连接。
  3. 填写以下字段:
    • Conn Id: my_rds_connection
    • Conn Type: 选择你的数据库类型(如 MySQL 或 PostgreSQL)。
    • Host: 你的 RDS 实例的端点。
    • Schema: 数据库名称。
    • Login: 数据库用户名。
    • Password: 数据库密码。
    • Port: 数据库端口(默认为 3306 或 5432)。
提示

你可以使用 Airflow 的 Variables 功能来安全地存储敏感信息,如数据库密码。

2. 创建 DAG 以与 RDS 交互

接下来,我们将创建一个 DAG(有向无环图),用于与 RDS 进行交互。以下是一个简单的 DAG 示例,它连接到 RDS 并执行一个查询。

python
from airflow import DAG
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.utils.dates import days_ago

default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
}

with DAG(
'rds_interaction_dag',
default_args=default_args,
schedule_interval='@daily',
) as dag:

query_task = MySqlOperator(
task_id='run_query',
mysql_conn_id='my_rds_connection',
sql='SELECT * FROM my_table LIMIT 10;',
)

query_task

在这个示例中,我们使用了 MySqlOperator 来执行一个简单的 SQL 查询。如果你使用的是 PostgreSQL,可以使用 PostgresOperator

3. 执行查询并处理结果

除了执行查询,你还可以在 Airflow 中处理查询结果。以下是一个示例,展示了如何将查询结果存储到 XCom 中,以便后续任务使用。

python
from airflow import DAG
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
import logging

default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
}

def process_results(**kwargs):
ti = kwargs['ti']
results = ti.xcom_pull(task_ids='run_query')
logging.info(f"Query results: {results}")

with DAG(
'rds_interaction_dag',
default_args=default_args,
schedule_interval='@daily',
) as dag:

query_task = MySqlOperator(
task_id='run_query',
mysql_conn_id='my_rds_connection',
sql='SELECT * FROM my_table LIMIT 10;',
)

process_task = PythonOperator(
task_id='process_results',
python_callable=process_results,
provide_context=True,
)

query_task >> process_task

在这个示例中,run_query 任务执行查询并将结果存储在 XCom 中。process_results 任务从 XCom 中提取结果并进行处理。

4. 实际应用场景

假设你有一个电商网站,每天需要从 RDS 中提取销售数据并生成报告。你可以使用 Airflow 自动执行以下任务:

  1. 从 RDS 中提取当天的销售数据。
  2. 将数据转换为适合报告的格式。
  3. 将报告存储到 S3 或发送给相关人员。

以下是一个简化的 DAG 示例:

python
from airflow import DAG
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.providers.amazon.aws.operators.s3 import S3FileTransformOperator
from airflow.utils.dates import days_ago

default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
}

with DAG(
'daily_sales_report',
default_args=default_args,
schedule_interval='@daily',
) as dag:

extract_data = MySqlOperator(
task_id='extract_sales_data',
mysql_conn_id='my_rds_connection',
sql='SELECT * FROM sales WHERE sale_date = CURDATE();',
)

transform_data = S3FileTransformOperator(
task_id='transform_data',
source_s3_key='s3://my-bucket/raw-data/',
dest_s3_key='s3://my-bucket/transformed-data/',
transform_script='/path/to/transform_script.py',
)

extract_data >> transform_data

总结

通过本教程,你已经学会了如何在 Airflow 中与 Amazon RDS 进行交互。我们介绍了如何配置连接、执行查询、处理结果以及在实际应用场景中使用这些技术。

附加资源

练习

  1. 创建一个 DAG,从 RDS 中提取数据并将其存储到 S3。
  2. 修改 DAG,使其在数据提取后发送电子邮件通知。
  3. 尝试使用不同的数据库引擎(如 PostgreSQL)并调整 DAG 以适应这些变化。

希望本教程对你有所帮助,祝你在 Airflow 和 RDS 的集成之旅中取得成功!