跳到主要内容

Airflow API扩展

Apache Airflow 是一个强大的工作流编排工具,广泛用于数据管道的调度和监控。除了通过 Web UI 和命令行工具管理任务外,Airflow 还提供了丰富的 API 接口,允许开发者通过编程方式与 Airflow 交互。本文将详细介绍 Airflow API 扩展功能,帮助初学者掌握如何通过 API 实现自动化任务管理和监控。

什么是 Airflow API?

Airflow API 是一组 RESTful 接口,允许开发者通过 HTTP 请求与 Airflow 进行交互。通过 API,您可以执行以下操作:

  • 触发 DAG 运行
  • 获取任务状态
  • 管理变量和连接
  • 查询任务日志
  • 动态创建或更新 DAG

API 扩展功能使得 Airflow 更加灵活,能够与其他系统无缝集成,实现自动化工作流管理。

启用 Airflow API

在默认情况下,Airflow 的 API 是启用的。您可以通过以下步骤确认 API 是否可用:

  1. 确保 Airflow 已安装并运行。
  2. 访问 Airflow Web UI,导航到 Admin -> API 页面,查看 API 文档。

如果 API 未启用,您可以通过修改 airflow.cfg 文件中的 [api] 部分来启用它:

ini
[api]
auth_backend = airflow.api.auth.backend.default

使用 Airflow API

1. 触发 DAG 运行

通过 API 触发 DAG 运行是最常见的用例之一。以下是一个使用 curl 命令触发 DAG 的示例:

bash
curl -X POST "http://localhost:8080/api/v1/dags/<dag_id>/dagRuns" \
-H "Content-Type: application/json" \
-d '{"conf": {}}'
  • <dag_id> 是您要触发的 DAG 的 ID。
  • conf 是传递给 DAG 的配置参数(可选)。

2. 获取任务状态

您可以通过 API 获取任务的当前状态。以下是一个获取任务状态的示例:

bash
curl -X GET "http://localhost:8080/api/v1/dags/<dag_id>/tasks/<task_id>/taskInstances/<execution_date>" \
-H "Content-Type: application/json"
  • <dag_id> 是 DAG 的 ID。
  • <task_id> 是任务的 ID。
  • <execution_date> 是任务的执行日期。

3. 管理变量和连接

Airflow API 还允许您管理变量和连接。以下是一个创建变量的示例:

bash
curl -X POST "http://localhost:8080/api/v1/variables" \
-H "Content-Type: application/json" \
-d '{"key": "my_var", "value": "my_value"}'

4. 查询任务日志

通过 API,您可以查询任务的日志。以下是一个获取任务日志的示例:

bash
curl -X GET "http://localhost:8080/api/v1/dags/<dag_id>/dagRuns/<dag_run_id>/taskInstances/<task_id>/logs/<task_try_number>" \
-H "Content-Type: application/json"
  • <dag_run_id> 是 DAG 运行的 ID。
  • <task_try_number> 是任务尝试的次数。

实际案例:自动化 DAG 触发

假设您有一个 DAG,每天需要根据外部系统的状态动态触发。您可以通过编写一个脚本,定期检查外部系统状态,并通过 Airflow API 触发 DAG 运行。

以下是一个 Python 脚本示例:

python
import requests

def trigger_dag(dag_id, conf=None):
url = f"http://localhost:8080/api/v1/dags/{dag_id}/dagRuns"
headers = {"Content-Type": "application/json"}
data = {"conf": conf} if conf else {}
response = requests.post(url, headers=headers, json=data)
return response.json()

# 触发 DAG
dag_id = "my_dag"
conf = {"param1": "value1", "param2": "value2"}
result = trigger_dag(dag_id, conf)
print(result)

总结

Airflow API 扩展功能为开发者提供了强大的工具,能够通过编程方式与 Airflow 交互,实现自动化任务管理和监控。通过本文的介绍,您应该已经掌握了如何使用 Airflow API 触发 DAG、获取任务状态、管理变量和连接,以及查询任务日志。

附加资源

练习

  1. 使用 Airflow API 触发一个 DAG,并获取其运行状态。
  2. 编写一个脚本,定期检查外部系统状态,并根据状态动态触发 DAG。
  3. 通过 API 创建一个变量,并在 DAG 中使用该变量。

通过完成这些练习,您将更深入地理解 Airflow API 的使用方法,并能够在实际项目中应用这些知识。