跳到主要内容

Structured Streaming 输出模式

Structured Streaming 是 Apache Spark 提供的一种流处理框架,它允许开发者以批处理的方式处理流数据。在 Structured Streaming 中,输出模式决定了流处理结果如何写入外部存储系统。理解不同的输出模式对于设计高效的流处理应用至关重要。

什么是输出模式?

输出模式定义了流处理结果如何被写入到外部存储系统中。Structured Streaming 提供了三种主要的输出模式:

  1. Complete 模式:每次触发时,输出完整的聚合结果。
  2. Append 模式:仅输出新添加到结果表中的行。
  3. Update 模式:输出自上次触发以来发生更改的行。

每种模式适用于不同的场景,选择正确的输出模式可以显著提高应用的性能和效率。

Complete 模式

Complete 模式 下,每次触发时,系统会输出完整的聚合结果。这意味着每次触发时,所有的聚合数据都会被写入到外部存储系统中。

适用场景

Complete 模式适用于需要输出完整聚合结果的场景,例如计算某个时间窗口内的总销售额。

代码示例

python
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, sum

# 创建 SparkSession
spark = SparkSession.builder.appName("CompleteModeExample").getOrCreate()

# 读取流数据
streamingDF = spark.readStream.format("rate").load()

# 定义窗口聚合
windowedCounts = streamingDF.groupBy(
window(streamingDF.timestamp, "10 minutes")
).agg(sum("value").alias("total_value"))

# 启动流查询,使用 Complete 模式
query = windowedCounts.writeStream \
.outputMode("complete") \
.format("console") \
.start()

query.awaitTermination()

输入与输出

假设输入数据如下:

timestampvalue
2023-10-01 10:00:0010
2023-10-01 10:05:0020
2023-10-01 10:10:0030

输出结果将包含每个时间窗口内的总 value

windowtotal_value
[2023-10-01 10:00:00, 2023-10-01 10:10:00]60

Append 模式

Append 模式 下,仅输出新添加到结果表中的行。这种模式适用于不需要聚合的场景,或者聚合结果不会随时间变化的场景。

适用场景

Append 模式适用于简单的过滤或转换操作,例如过滤掉某些不符合条件的记录。

代码示例

python
from pyspark.sql import SparkSession

# 创建 SparkSession
spark = SparkSession.builder.appName("AppendModeExample").getOrCreate()

# 读取流数据
streamingDF = spark.readStream.format("rate").load()

# 过滤操作
filteredDF = streamingDF.filter(streamingDF.value > 5)

# 启动流查询,使用 Append 模式
query = filteredDF.writeStream \
.outputMode("append") \
.format("console") \
.start()

query.awaitTermination()

输入与输出

假设输入数据如下:

timestampvalue
2023-10-01 10:00:003
2023-10-01 10:05:007
2023-10-01 10:10:0010

输出结果将仅包含 value 大于 5 的记录:

timestampvalue
2023-10-01 10:05:007
2023-10-01 10:10:0010

Update 模式

Update 模式 下,仅输出自上次触发以来发生更改的行。这种模式适用于需要更新先前结果的场景。

适用场景

Update 模式适用于需要动态更新结果的场景,例如实时更新用户的累计消费金额。

代码示例

python
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum

# 创建 SparkSession
spark = SparkSession.builder.appName("UpdateModeExample").getOrCreate()

# 读取流数据
streamingDF = spark.readStream.format("rate").load()

# 定义聚合操作
aggregatedDF = streamingDF.groupBy("value").agg(sum("value").alias("total_value"))

# 启动流查询,使用 Update 模式
query = aggregatedDF.writeStream \
.outputMode("update") \
.format("console") \
.start()

query.awaitTermination()

输入与输出

假设输入数据如下:

timestampvalue
2023-10-01 10:00:0010
2023-10-01 10:05:0020
2023-10-01 10:10:0010

输出结果将仅包含自上次触发以来发生更改的行:

valuetotal_value
1020
2020

实际应用场景

实时仪表盘

在实时仪表盘中,通常需要实时更新数据。使用 Update 模式 可以确保仪表盘中的数据始终保持最新状态。

日志处理

在日志处理中,通常只需要处理新增的日志条目。使用 Append 模式 可以有效地过滤和转换日志数据。

实时分析

在实时分析中,通常需要计算某个时间窗口内的聚合结果。使用 Complete 模式 可以确保每次触发时输出完整的聚合结果。

总结

Structured Streaming 提供了三种输出模式:Complete、Append 和 Update。每种模式适用于不同的场景,选择正确的输出模式可以显著提高应用的性能和效率。

  • Complete 模式:适用于需要输出完整聚合结果的场景。
  • Append 模式:适用于不需要聚合的场景。
  • Update 模式:适用于需要动态更新结果的场景。

附加资源与练习

  • 练习 1:尝试在 Complete 模式下计算某个时间窗口内的平均值。
  • 练习 2:在 Append 模式下实现一个简单的日志过滤器。
  • 练习 3:在 Update 模式下实现一个实时更新的用户消费金额计算器。
提示

建议初学者通过实际代码练习来加深对输出模式的理解。尝试在不同的场景下使用不同的输出模式,并观察输出结果的变化。