跳到主要内容

Airflow DAG 基础

什么是 DAG?

DAG(Directed Acyclic Graph,有向无环图)是 Apache Airflow 的核心概念之一。它用于定义任务之间的依赖关系,并确保任务按照正确的顺序执行。DAG 是一个由任务节点组成的有向图,其中每个节点代表一个任务,边代表任务之间的依赖关系。DAG 必须是无环的,这意味着任务之间不能形成循环依赖。

为什么使用 DAG?

DAG 的主要作用是管理和调度复杂的工作流。通过 DAG,你可以清晰地定义任务的执行顺序、依赖关系以及调度时间。这使得 Airflow 成为处理 ETL(Extract, Transform, Load)任务、数据管道和批处理作业的理想工具。

如何定义一个 DAG?

在 Airflow 中,DAG 是通过 Python 脚本定义的。以下是一个简单的 DAG 定义示例:

python
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime

# 定义 DAG
dag = DAG(
'my_first_dag', # DAG 的唯一标识符
description='A simple tutorial DAG',
schedule_interval='@daily', # 调度间隔
start_date=datetime(2023, 1, 1), # 开始日期
catchup=False, # 是否补跑过去的任务
)

# 定义任务
start_task = DummyOperator(task_id='start_task', dag=dag)
end_task = DummyOperator(task_id='end_task', dag=dag)

# 定义任务依赖关系
start_task >> end_task

在这个示例中,我们定义了一个名为 my_first_dag 的 DAG,它包含两个任务:start_taskend_taskstart_task 是 DAG 的起点,end_task 是 DAG 的终点。通过 >> 操作符,我们定义了 start_task 必须在 end_task 之前执行。

DAG 的调度

DAG 的调度是通过 schedule_interval 参数来控制的。你可以使用 cron 表达式或 Airflow 提供的预定义调度间隔(如 @daily@hourly 等)来定义 DAG 的执行频率。

提示

如果你不希望 DAG 自动调度,可以将 schedule_interval 设置为 None。这样,DAG 只能通过手动触发来执行。

任务依赖关系

在 Airflow 中,任务之间的依赖关系是通过 >><< 操作符来定义的。>> 表示“先执行左边的任务,再执行右边的任务”,而 << 则表示相反的顺序。

python
task1 >> task2  # task1 必须在 task2 之前执行
task2 << task1 # 同上

你还可以定义多个任务之间的复杂依赖关系:

python
task1 >> task2 >> task3
task1 >> task4 >> task3

在这个例子中,task1 必须在 task2task4 之前执行,而 task2task4 又必须在 task3 之前执行。

实际案例:ETL 管道

假设我们需要构建一个简单的 ETL 管道,从数据库中提取数据,进行转换,然后加载到另一个数据库中。我们可以使用 Airflow 来定义这个管道:

python
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def extract():
print("Extracting data from source database...")

def transform():
print("Transforming data...")

def load():
print("Loading data into target database...")

# 定义 DAG
dag = DAG(
'etl_pipeline',
description='A simple ETL pipeline',
schedule_interval='@daily',
start_date=datetime(2023, 1, 1),
catchup=False,
)

# 定义任务
extract_task = PythonOperator(task_id='extract_task', python_callable=extract, dag=dag)
transform_task = PythonOperator(task_id='transform_task', python_callable=transform, dag=dag)
load_task = PythonOperator(task_id='load_task', python_callable=load, dag=dag)

# 定义任务依赖关系
extract_task >> transform_task >> load_task

在这个例子中,我们定义了三个任务:extract_tasktransform_taskload_taskextract_task 负责从源数据库中提取数据,transform_task 负责对数据进行转换,load_task 负责将转换后的数据加载到目标数据库中。通过 >> 操作符,我们确保了任务按照正确的顺序执行。

总结

DAG 是 Apache Airflow 中用于定义和管理任务流的核心概念。通过 DAG,你可以清晰地定义任务的执行顺序、依赖关系以及调度时间。本文介绍了如何定义 DAG、设置调度、定义任务依赖关系,并通过一个实际的 ETL 管道案例展示了 DAG 的应用。

附加资源

练习

  1. 创建一个包含三个任务的 DAG,任务分别为 task_atask_btask_c,并确保 task_atask_b 之前执行,task_btask_c 之前执行。
  2. 修改上述 ETL 管道示例,添加一个 validate_task,用于在加载数据之前验证数据的完整性。