跳到主要内容

Airflow HttpSensor

在 Apache Airflow 中,Sensor 是一种特殊类型的 Operator,用于等待某些外部条件满足后再继续执行后续任务。HttpSensor 是其中一种常用的 Sensor,用于监控 HTTP 端点,直到其返回特定的响应状态码或内容。

什么是 HttpSensor?

HttpSensor 会定期向指定的 HTTP 端点发送请求,并检查响应是否符合预期条件。如果条件满足,Sensor 会标记为成功,任务流程将继续执行;否则,Sensor 会继续等待,直到条件满足或超时。

HttpSensor 非常适合用于以下场景:

  • 等待某个 API 服务启动并返回预期的响应。
  • 监控某个外部系统的状态,直到其准备好接收数据。
  • 确保某个依赖服务在任务执行前已正常运行。

HttpSensor 的基本用法

下面是一个简单的 HttpSensor 示例,展示如何监控一个 HTTP 端点,直到其返回状态码 200。

python
from airflow import DAG
from airflow.sensors.http_sensor import HttpSensor
from airflow.utils.dates import days_ago

default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
}

with DAG('http_sensor_example', default_args=default_args, schedule_interval=None) as dag:
wait_for_api = HttpSensor(
task_id='wait_for_api',
http_conn_id='my_http_connection',
endpoint='api/status',
request_params={},
response_check=lambda response: response.status_code == 200,
poke_interval=5, # 每 5 秒检查一次
timeout=300, # 最多等待 300 秒
)

代码解析

  1. http_conn_id: 这是 Airflow 中配置的 HTTP 连接 ID,用于指定目标服务的连接信息(如 URL、认证等)。
  2. endpoint: 这是要监控的 API 端点路径。
  3. response_check: 这是一个回调函数,用于检查 HTTP 响应是否符合预期。在这个例子中,我们检查状态码是否为 200。
  4. poke_interval: 这是 Sensor 检查条件的间隔时间(以秒为单位)。
  5. timeout: 这是 Sensor 的最大等待时间(以秒为单位)。如果超时,任务将失败。
提示

你可以通过 Airflow 的 Web UI 或 CLI 配置 HTTP 连接(http_conn_id),以便在多个任务中复用。

实际应用场景

假设你有一个数据处理任务,需要等待某个外部 API 服务启动并返回状态码 200 后才能继续执行。你可以使用 HttpSensor 来监控该 API 的状态,确保任务在正确的时间执行。

python
from airflow.operators.python_operator import PythonOperator

def process_data():
print("Processing data...")

with DAG('http_sensor_workflow', default_args=default_args, schedule_interval=None) as dag:
wait_for_api = HttpSensor(
task_id='wait_for_api',
http_conn_id='my_http_connection',
endpoint='api/status',
response_check=lambda response: response.status_code == 200,
poke_interval=5,
timeout=300,
)

process_task = PythonOperator(
task_id='process_data',
python_callable=process_data,
)

wait_for_api >> process_task

在这个例子中,wait_for_api 任务会持续监控 API 的状态,直到其返回状态码 200。一旦条件满足,process_data 任务将被触发。

总结

HttpSensor 是 Apache Airflow 中一个强大的工具,用于监控 HTTP 端点并确保任务在特定条件下执行。通过合理配置 response_checkpoke_intervaltimeout,你可以灵活地控制任务的执行流程。

备注

如果你需要监控更复杂的条件(如响应内容),可以在 response_check 函数中实现自定义逻辑。

附加资源与练习

  1. 练习: 尝试创建一个 DAG,使用 HttpSensor 监控一个公开的 API(如 https://httpbin.org/status/200),并在条件满足后打印一条消息。
  2. 深入学习: 阅读 Airflow 官方文档 中关于 Sensors 的更多内容,了解其他类型的 Sensor(如 FileSensor、SqlSensor 等)。
  3. 扩展: 尝试将 HttpSensor 与其他 Operator 结合使用,构建一个完整的工作流。

通过掌握 HttpSensor,你将能够更好地管理依赖外部服务的任务流程,提升工作流的可靠性和灵活性。