Airflow DAG 基础
什么是 DAG?
DAG(Directed Acyclic Graph,有向无环图)是 Apache Airflow 的核心概念之一。它用于定义任务之间的依赖关系,并确保任务按照正确的顺序执行。DAG 是一个由任务节点组成的有向图,其中每个节点代表一个任务,边代表任务之间的依赖关系。DAG 必须是无环的,这意味着任务之间不能形成循环依赖。
为什么使用 DAG?
DAG 的主要作用是管理和调度复杂的工作流。通过 DAG,你可以清晰地定义任务的执行顺序、依赖关系以及调度时间。这使得 Airflow 成为处理 ETL(Extract, Transform, Load)任务、数据管道和批处理作业的理想工具。
如何定义一个 DAG?
在 Airflow 中,DAG 是通过 Python 脚本定义的。以下是一个简单的 DAG 定义示例:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
# 定义 DAG
dag = DAG(
'my_first_dag', # DAG 的唯一标识符
description='A simple tutorial DAG',
schedule_interval='@daily', # 调度间隔
start_date=datetime(2023, 1, 1), # 开始日期
catchup=False, # 是否补跑过去的任务
)
# 定义任务
start_task = DummyOperator(task_id='start_task', dag=dag)
end_task = DummyOperator(task_id='end_task', dag=dag)
# 定义任务依赖关系
start_task >> end_task
在这个示例中,我们定义了一个名为 my_first_dag
的 DAG,它包含两个任务:start_task
和 end_task
。start_task
是 DAG 的起点,end_task
是 DAG 的终点。通过 >>
操作符,我们定义了 start_task
必须在 end_task
之前执行。
DAG 的调度
DAG 的调度是通过 schedule_interval
参数来控制的。你可以使用 cron 表达式或 Airflow 提供的预定义调度间隔(如 @daily
、@hourly
等)来定义 DAG 的执行频率。
如果你不希望 DAG 自动调度,可以将 schedule_interval
设置为 None
。这样,DAG 只能通过手动触发来执行。
任务依赖关系
在 Airflow 中,任务之间的依赖关系是通过 >>
和 <<
操作符来定义的。>>
表示“先执行左边的任务,再执行右边的任务”,而 <<
则表示相反的顺序。
task1 >> task2 # task1 必须在 task2 之前执行
task2 << task1 # 同上
你还可以定义多个任务之间的复杂依赖关系:
task1 >> task2 >> task3
task1 >> task4 >> task3
在这个例子中,task1
必须在 task2
和 task4
之前执行,而 task2
和 task4
又必须在 task3
之前执行。
实际案例:ETL 管道
假设我们需要构建一个简单的 ETL 管道,从数据库中提取数据,进行转换,然后加载到另一个数据库中。我们可以使用 Airflow 来定义这个管道:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def extract():
print("Extracting data from source database...")
def transform():
print("Transforming data...")
def load():
print("Loading data into target database...")
# 定义 DAG
dag = DAG(
'etl_pipeline',
description='A simple ETL pipeline',
schedule_interval='@daily',
start_date=datetime(2023, 1, 1),
catchup=False,
)
# 定义任务
extract_task = PythonOperator(task_id='extract_task', python_callable=extract, dag=dag)
transform_task = PythonOperator(task_id='transform_task', python_callable=transform, dag=dag)
load_task = PythonOperator(task_id='load_task', python_callable=load, dag=dag)
# 定义任务依赖关系
extract_task >> transform_task >> load_task
在这个例子中,我们定义了三个任务:extract_task
、transform_task
和 load_task
。extract_task
负责从源数据库中提取数据,transform_task
负责对数据进行转换,load_task
负责将转换后的数据加载到目标数据库中。通过 >>
操作符,我们确保了任务按照正确的顺序执行。
总结
DAG 是 Apache Airflow 中用于定义和管理任务流的核心概念。通过 DAG,你可以清晰地定义任务的执行顺序、依赖关系以及调度时间。本文介绍了如何定义 DAG、设置调度、定义任务依赖关系,并通过一个实际的 ETL 管道案例展示了 DAG 的应用。
附加资源
练习
- 创建一个包含三个任务的 DAG,任务分别为
task_a
、task_b
和task_c
,并确保task_a
在task_b
之前执行,task_b
在task_c
之前执行。 - 修改上述 ETL 管道示例,添加一个
validate_task
,用于在加载数据之前验证数据的完整性。