Airflow DAG 定义语法
Apache Airflow 是一个用于编排复杂工作流的开源工具。DAG(有向无环图)是 Airflow 的核心概念,它定义了任务之间的依赖关系和执行顺序。本文将详细介绍如何定义 DAG,并通过实际案例帮助你理解其语法和应用场景。
什么是 DAG?
DAG 是 Directed Acyclic Graph 的缩写,意为“有向无环图”。在 Airflow 中,DAG 用于描述一组任务及其依赖关系。每个任务是一个节点,任务之间的依赖关系用有向边表示。DAG 确保任务按照正确的顺序执行,并且不会出现循环依赖。
DAG 的基本结构
一个典型的 DAG 定义包含以下几个部分:
- 导入必要的模块:首先需要导入 Airflow 的核心模块。
- 定义 DAG:创建一个 DAG 对象,指定其名称、描述、调度间隔等属性。
- 定义任务:使用 Airflow 提供的操作符(Operator)定义具体的任务。
- 设置任务依赖关系:通过
>>
或set_downstream
方法定义任务之间的依赖关系。
示例代码
以下是一个简单的 DAG 定义示例:
python
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
# 定义 DAG
dag = DAG(
'example_dag',
description='一个简单的 DAG 示例',
schedule_interval='@daily',
start_date=datetime(2023, 1, 1),
catchup=False
)
# 定义任务
start_task = DummyOperator(task_id='start_task', dag=dag)
end_task = DummyOperator(task_id='end_task', dag=dag)
def print_hello():
print("Hello, Airflow!")
hello_task = PythonOperator(
task_id='hello_task',
python_callable=print_hello,
dag=dag
)
# 设置任务依赖关系
start_task >> hello_task >> end_task
代码解析
- 导入模块:我们导入了
DAG
、DummyOperator
和PythonOperator
等必要的模块。 - 定义 DAG:
DAG
对象包含了 DAG 的名称、描述、调度间隔和开始日期等属性。 - 定义任务:
DummyOperator
用于创建不执行任何操作的任务,PythonOperator
用于执行 Python 函数。 - 设置依赖关系:通过
>>
操作符,我们定义了任务的执行顺序:start_task
->hello_task
->end_task
。
DAG 的属性
在定义 DAG 时,可以设置多个属性来控制其行为。以下是一些常用的属性:
dag_id
:DAG 的唯一标识符。description
:DAG 的描述信息。schedule_interval
:DAG 的调度间隔,可以是@daily
、@weekly
等预定义值,也可以是 cron 表达式。start_date
:DAG 的开始日期。catchup
:是否启用追赶(catchup),即在 DAG 启动时是否执行过去未执行的任务。
实际案例
假设我们需要创建一个 DAG,每天从数据库中提取数据,进行清洗,然后将结果存储到另一个数据库中。以下是该 DAG 的定义:
python
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def extract_data():
print("Extracting data from source database...")
def transform_data():
print("Transforming data...")
def load_data():
print("Loading data into target database...")
# 定义 DAG
dag = DAG(
'etl_pipeline',
description='一个简单的 ETL 管道',
schedule_interval='@daily',
start_date=datetime(2023, 1, 1),
catchup=False
)
# 定义任务
extract_task = PythonOperator(
task_id='extract_task',
python_callable=extract_data,
dag=dag
)
transform_task = PythonOperator(
task_id='transform_task',
python_callable=transform_data,
dag=dag
)
load_task = PythonOperator(
task_id='load_task',
python_callable=load_data,
dag=dag
)
# 设置任务依赖关系
extract_task >> transform_task >> load_task
在这个案例中,我们定义了一个 ETL(Extract, Transform, Load)管道,每天执行一次。任务之间的依赖关系确保了数据提取、转换和加载的顺序。
总结
通过本文,你了解了如何在 Airflow 中定义 DAG,并掌握了 DAG 的基本结构和常用属性。我们还通过一个实际案例展示了如何创建一个简单的 ETL 管道。希望这些内容能帮助你更好地理解和使用 Airflow。
附加资源
练习
- 创建一个 DAG,每天打印当前日期。
- 修改上述 ETL 管道,增加一个任务用于验证数据质量。
- 尝试使用不同的调度间隔(如每小时一次)来定义 DAG。
通过完成这些练习,你将进一步巩固对 Airflow DAG 定义语法的理解。