跳到主要内容

Airflow DAG 定义语法

Apache Airflow 是一个用于编排复杂工作流的开源工具。DAG(有向无环图)是 Airflow 的核心概念,它定义了任务之间的依赖关系和执行顺序。本文将详细介绍如何定义 DAG,并通过实际案例帮助你理解其语法和应用场景。

什么是 DAG?

DAG 是 Directed Acyclic Graph 的缩写,意为“有向无环图”。在 Airflow 中,DAG 用于描述一组任务及其依赖关系。每个任务是一个节点,任务之间的依赖关系用有向边表示。DAG 确保任务按照正确的顺序执行,并且不会出现循环依赖。

DAG 的基本结构

一个典型的 DAG 定义包含以下几个部分:

  1. 导入必要的模块:首先需要导入 Airflow 的核心模块。
  2. 定义 DAG:创建一个 DAG 对象,指定其名称、描述、调度间隔等属性。
  3. 定义任务:使用 Airflow 提供的操作符(Operator)定义具体的任务。
  4. 设置任务依赖关系:通过 >>set_downstream 方法定义任务之间的依赖关系。

示例代码

以下是一个简单的 DAG 定义示例:

python
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

# 定义 DAG
dag = DAG(
'example_dag',
description='一个简单的 DAG 示例',
schedule_interval='@daily',
start_date=datetime(2023, 1, 1),
catchup=False
)

# 定义任务
start_task = DummyOperator(task_id='start_task', dag=dag)
end_task = DummyOperator(task_id='end_task', dag=dag)

def print_hello():
print("Hello, Airflow!")

hello_task = PythonOperator(
task_id='hello_task',
python_callable=print_hello,
dag=dag
)

# 设置任务依赖关系
start_task >> hello_task >> end_task

代码解析

  • 导入模块:我们导入了 DAGDummyOperatorPythonOperator 等必要的模块。
  • 定义 DAGDAG 对象包含了 DAG 的名称、描述、调度间隔和开始日期等属性。
  • 定义任务DummyOperator 用于创建不执行任何操作的任务,PythonOperator 用于执行 Python 函数。
  • 设置依赖关系:通过 >> 操作符,我们定义了任务的执行顺序:start_task -> hello_task -> end_task

DAG 的属性

在定义 DAG 时,可以设置多个属性来控制其行为。以下是一些常用的属性:

  • dag_id:DAG 的唯一标识符。
  • description:DAG 的描述信息。
  • schedule_interval:DAG 的调度间隔,可以是 @daily@weekly 等预定义值,也可以是 cron 表达式。
  • start_date:DAG 的开始日期。
  • catchup:是否启用追赶(catchup),即在 DAG 启动时是否执行过去未执行的任务。

实际案例

假设我们需要创建一个 DAG,每天从数据库中提取数据,进行清洗,然后将结果存储到另一个数据库中。以下是该 DAG 的定义:

python
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def extract_data():
print("Extracting data from source database...")

def transform_data():
print("Transforming data...")

def load_data():
print("Loading data into target database...")

# 定义 DAG
dag = DAG(
'etl_pipeline',
description='一个简单的 ETL 管道',
schedule_interval='@daily',
start_date=datetime(2023, 1, 1),
catchup=False
)

# 定义任务
extract_task = PythonOperator(
task_id='extract_task',
python_callable=extract_data,
dag=dag
)

transform_task = PythonOperator(
task_id='transform_task',
python_callable=transform_data,
dag=dag
)

load_task = PythonOperator(
task_id='load_task',
python_callable=load_data,
dag=dag
)

# 设置任务依赖关系
extract_task >> transform_task >> load_task

在这个案例中,我们定义了一个 ETL(Extract, Transform, Load)管道,每天执行一次。任务之间的依赖关系确保了数据提取、转换和加载的顺序。

总结

通过本文,你了解了如何在 Airflow 中定义 DAG,并掌握了 DAG 的基本结构和常用属性。我们还通过一个实际案例展示了如何创建一个简单的 ETL 管道。希望这些内容能帮助你更好地理解和使用 Airflow。

附加资源

练习

  1. 创建一个 DAG,每天打印当前日期。
  2. 修改上述 ETL 管道,增加一个任务用于验证数据质量。
  3. 尝试使用不同的调度间隔(如每小时一次)来定义 DAG。

通过完成这些练习,你将进一步巩固对 Airflow DAG 定义语法的理解。