跳到主要内容

Airflow 与Azure Blob交互

介绍

Apache Airflow 是一个强大的工作流管理工具,广泛用于数据管道的编排和调度。Azure Blob Storage 是微软提供的云存储服务,常用于存储大量非结构化数据,如图片、视频、日志文件等。将 Airflow 与 Azure Blob 集成,可以帮助你自动化数据的上传、下载和管理任务。

本文将逐步介绍如何在 Airflow 中与 Azure Blob Storage 进行交互,包括如何配置连接、上传和下载文件,以及如何在实际场景中应用这些功能。

配置 Azure Blob 连接

在 Airflow 中与 Azure Blob 交互的第一步是配置连接。Airflow 提供了 AzureBlobStorageHook,这是一个用于与 Azure Blob Storage 交互的钩子。

首先,你需要在 Airflow 的 Web UI 中配置 Azure Blob 的连接信息。进入 Admin > Connections,然后点击 Create 按钮。填写以下信息:

  • Conn Id: azure_blob_default
  • Conn Type: Azure Blob Storage
  • Login: 你的 Azure 存储账户名称
  • Password: 你的 Azure 存储账户密钥

保存后,你就可以在 DAG 中使用这个连接了。

上传文件到 Azure Blob

接下来,我们将演示如何将本地文件上传到 Azure Blob Storage。以下是一个简单的 DAG 示例:

python
from airflow import DAG
from airflow.providers.microsoft.azure.hooks.azure_blob import AzureBlobStorageHook
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def upload_to_blob():
hook = AzureBlobStorageHook(conn_id='azure_blob_default')
hook.upload_file(
container_name='my-container',
blob_name='my-blob',
file_path='/path/to/local/file.txt'
)

default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
}

with DAG('upload_to_azure_blob', default_args=default_args, schedule_interval='@once') as dag:
upload_task = PythonOperator(
task_id='upload_file',
python_callable=upload_to_blob
)

upload_task

在这个示例中,我们定义了一个 upload_to_blob 函数,它使用 AzureBlobStorageHook 将本地文件上传到 Azure Blob Storage。upload_file 方法需要指定容器名称、Blob 名称以及本地文件的路径。

从 Azure Blob 下载文件

与上传文件类似,你也可以从 Azure Blob Storage 下载文件。以下是一个下载文件的 DAG 示例:

python
from airflow import DAG
from airflow.providers.microsoft.azure.hooks.azure_blob import AzureBlobStorageHook
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def download_from_blob():
hook = AzureBlobStorageHook(conn_id='azure_blob_default')
hook.download_file(
container_name='my-container',
blob_name='my-blob',
file_path='/path/to/local/destination/file.txt'
)

default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
}

with DAG('download_from_azure_blob', default_args=default_args, schedule_interval='@once') as dag:
download_task = PythonOperator(
task_id='download_file',
python_callable=download_from_blob
)

download_task

在这个示例中,我们定义了一个 download_from_blob 函数,它使用 AzureBlobStorageHook 从 Azure Blob Storage 下载文件到本地。

实际应用场景

假设你有一个每天生成日志文件的数据处理任务,你需要将这些日志文件上传到 Azure Blob Storage 进行长期存储。你可以使用 Airflow 来自动化这个过程。以下是一个实际应用场景的 DAG 示例:

python
from airflow import DAG
from airflow.providers.microsoft.azure.hooks.azure_blob import AzureBlobStorageHook
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

def upload_logs_to_blob():
hook = AzureBlobStorageHook(conn_id='azure_blob_default')
log_file_path = f'/path/to/logs/{datetime.now().strftime("%Y-%m-%d")}.log'
hook.upload_file(
container_name='log-container',
blob_name=f'logs/{datetime.now().strftime("%Y-%m-%d")}.log',
file_path=log_file_path
)

default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

with DAG('daily_log_upload', default_args=default_args, schedule_interval='@daily') as dag:
upload_logs_task = PythonOperator(
task_id='upload_logs',
python_callable=upload_logs_to_blob
)

upload_logs_task

在这个示例中,我们每天都会将生成的日志文件上传到 Azure Blob Storage 中,文件名以日期命名。

总结

通过本文,你学习了如何在 Airflow 中与 Azure Blob Storage 进行交互,包括配置连接、上传和下载文件。我们还展示了一个实际应用场景,帮助你理解如何将这些功能应用到实际工作中。

提示

如果你想进一步学习 Azure Blob Storage 的高级功能,可以参考 Azure Blob Storage 官方文档.

附加资源

练习

  1. 修改上传文件的 DAG,使其能够上传多个文件到不同的 Blob 容器中。
  2. 创建一个 DAG,每天从 Azure Blob Storage 下载最新的日志文件,并将其存储到本地目录中。