Airflow DAG 文件结构
Apache Airflow 是一个用于编排和调度工作流的强大工具。DAG(有向无环图)是 Airflow 的核心概念,它定义了任务之间的依赖关系和执行顺序。本文将详细介绍 DAG 文件的结构,帮助初学者理解如何编写和组织 DAG 文件。
什么是 DAG 文件?
DAG 文件是一个 Python 脚本,用于定义 Airflow 中的工作流。它包含任务的逻辑、任务之间的依赖关系以及调度信息。DAG 文件通常存储在 Airflow 的 dags
目录中,Airflow 会定期扫描该目录以加载新的或更新的 DAG。
DAG 文件的基本结构
一个典型的 DAG 文件包含以下几个部分:
- 导入模块:导入所需的 Airflow 模块和其他 Python 库。
- 定义 DAG:创建 DAG 对象,设置 DAG 的基本属性。
- 定义任务:创建任务对象,定义任务的逻辑。
- 设置任务依赖关系:指定任务之间的执行顺序。
1. 导入模块
首先,我们需要导入 Airflow 的核心模块以及其他必要的 Python 库。以下是一个常见的导入列表:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
2. 定义 DAG
接下来,我们创建一个 DAG 对象,并设置其基本属性,如 DAG 的名称、描述、调度间隔等。
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'my_first_dag',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
)
start_date
是 DAG 开始执行的时间点,schedule_interval
定义了 DAG 的执行频率。
3. 定义任务
在 DAG 中,任务是通过操作符(Operator)来定义的。以下是一个使用 PythonOperator
的示例:
def print_hello():
print("Hello, Airflow!")
task1 = PythonOperator(
task_id='print_hello',
python_callable=print_hello,
dag=dag,
)
4. 设置任务依赖关系
最后,我们需要指定任务之间的依赖关系。以下是一个简单的依赖关系示例:
task1 >> task2
这表示 task1
必须在 task2
之前执行。
实际案例
假设我们需要创建一个 DAG,每天执行一次,包含两个任务:第一个任务打印 "Hello, Airflow!",第二个任务打印 "Goodbye, Airflow!"。以下是完整的 DAG 文件:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
def print_hello():
print("Hello, Airflow!")
def print_goodbye():
print("Goodbye, Airflow!")
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'hello_goodbye_dag',
default_args=default_args,
description='A simple DAG that prints hello and goodbye',
schedule_interval=timedelta(days=1),
)
task1 = PythonOperator(
task_id='print_hello',
python_callable=print_hello,
dag=dag,
)
task2 = PythonOperator(
task_id='print_goodbye',
python_callable=print_goodbye,
dag=dag,
)
task1 >> task2
总结
通过本文,我们了解了 Airflow DAG 文件的基本结构,包括如何导入模块、定义 DAG、创建任务以及设置任务依赖关系。掌握这些基础知识后,您可以开始构建自己的 DAG,并逐步探索 Airflow 的更多高级功能。
附加资源
练习
- 创建一个新的 DAG,包含三个任务:第一个任务打印 "Task 1",第二个任务打印 "Task 2",第三个任务打印 "Task 3"。设置任务依赖关系,使得任务按顺序执行。
- 修改上述 DAG,使其每 5 分钟执行一次。
- 尝试使用不同的操作符(如
BashOperator
)来定义任务。
通过完成这些练习,您将更深入地理解 Airflow DAG 文件的结构和工作原理。