Structured Streaming 输出模式
Structured Streaming 是 Apache Spark 提供的一种流处理框架,它允许开发者以批处理的方式处理流数据。在 Structured Streaming 中,输出模式决定了流处理结果如何写入外部存储系统。理解不同的输出模式对于设计高效的流处理应用至关重要。
什么是输出模式?
输出模式定义了流处理结果如何被写入到外部存储系统中。Structured Streaming 提供了三种主要的输出模式:
- Complete 模式:每次触发时,输出完整的聚合结果。
- Append 模式:仅输出新添加到结果表中的行。
- Update 模式:输出自上次触发以来发生更改的行。
每种模式适用于不同的场景,选择正确的输出模式可以显著提高应用的性能和效率。
Complete 模式
在 Complete 模式 下,每次触发时,系统会输出完整的聚合结果。这意味着每次触发时,所有的聚合数据都会被写入到外部存储系统中。
适用场景
Complete 模式适用于需要输出完整聚合结果的场景,例如计算某个时间窗口内的总销售额。
代码示例
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, sum
# 创建 SparkSession
spark = SparkSession.builder.appName("CompleteModeExample").getOrCreate()
# 读取流数据
streamingDF = spark.readStream.format("rate").load()
# 定义窗口聚合
windowedCounts = streamingDF.groupBy(
window(streamingDF.timestamp, "10 minutes")
).agg(sum("value").alias("total_value"))
# 启动流查询,使用 Complete 模式
query = windowedCounts.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
输入与输出
假设输入数据如下:
timestamp | value |
---|---|
2023-10-01 10:00:00 | 10 |
2023-10-01 10:05:00 | 20 |
2023-10-01 10:10:00 | 30 |
输出结果将包含每个时间窗口内的总 value
:
window | total_value |
---|---|
[2023-10-01 10:00:00, 2023-10-01 10:10:00] | 60 |
Append 模式
在 Append 模式 下,仅输出新添加到结果表中的行。这种模式适用于不需要聚合的场景,或者聚合结果不会随时间变化的场景。
适用场景
Append 模式适用于简单的过滤或转换操作,例如过滤掉某些不符合条件的记录。
代码示例
from pyspark.sql import SparkSession
# 创建 SparkSession
spark = SparkSession.builder.appName("AppendModeExample").getOrCreate()
# 读取流数据
streamingDF = spark.readStream.format("rate").load()
# 过滤操作
filteredDF = streamingDF.filter(streamingDF.value > 5)
# 启动流查询,使用 Append 模式
query = filteredDF.writeStream \
.outputMode("append") \
.format("console") \
.start()
query.awaitTermination()
输入与输出
假设输入数据如下:
timestamp | value |
---|---|
2023-10-01 10:00:00 | 3 |
2023-10-01 10:05:00 | 7 |
2023-10-01 10:10:00 | 10 |
输出结果将仅包含 value
大于 5 的记录:
timestamp | value |
---|---|
2023-10-01 10:05:00 | 7 |
2023-10-01 10:10:00 | 10 |
Update 模式
在 Update 模式 下,仅输出自上次触发以来发生更改的行。这种模式适用于需要更新先前结果的场景。
适用场景
Update 模式适用于需要动态更新结果的场景,例如实时更新用户的累计消费金额。
代码示例
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum
# 创建 SparkSession
spark = SparkSession.builder.appName("UpdateModeExample").getOrCreate()
# 读取流数据
streamingDF = spark.readStream.format("rate").load()
# 定义聚合操作
aggregatedDF = streamingDF.groupBy("value").agg(sum("value").alias("total_value"))
# 启动流查询,使用 Update 模式
query = aggregatedDF.writeStream \
.outputMode("update") \
.format("console") \
.start()
query.awaitTermination()
输入与输出
假设输入数据如下:
timestamp | value |
---|---|
2023-10-01 10:00:00 | 10 |
2023-10-01 10:05:00 | 20 |
2023-10-01 10:10:00 | 10 |
输出结果将仅包含自上次触发以来发生更改的行:
value | total_value |
---|---|
10 | 20 |
20 | 20 |
实际应用场景
实时仪表盘
在实时仪表盘中,通常需要实时更新数据。使用 Update 模式 可以确保仪表盘中的数据始终保持最新状态。
日志处理
在日志处理中,通常只需要处理新增的日志条目。使用 Append 模式 可以有效地过滤和转换日志数据。
实时分析
在实时分析中,通常需要计算某个时间窗口内的聚合结果。使用 Complete 模式 可以确保每次触发时输出完整的聚合结果。
总结
Structured Streaming 提供了三种输出模式:Complete、Append 和 Update。每种模式适用于不同的场景,选择正确的输出模式可以显著提高应用的性能和效率。
- Complete 模式:适用于需要输出完整聚合结果的场景。
- Append 模式:适用于不需要聚合的场景。
- Update 模式:适用于需要动态更新结果的场景。
附加资源与练习
- 练习 1:尝试在 Complete 模式下计算某个时间窗口内的平均值。
- 练习 2:在 Append 模式下实现一个简单的日志过滤器。
- 练习 3:在 Update 模式下实现一个实时更新的用户消费金额计算器。
建议初学者通过实际代码练习来加深对输出模式的理解。尝试在不同的场景下使用不同的输出模式,并观察输出结果的变化。