跳到主要内容

并行度设置

在 Apache Spark 中,并行度是指任务在集群中并行执行的程度。正确设置并行度可以显著提高作业的执行效率,减少资源浪费。本文将详细介绍并行度的概念、如何调整并行度以及实际应用场景。

什么是并行度?

并行度是指 Spark 作业中同时执行的任务数量。在 Spark 中,任务通常被划分为多个分区(Partitions),每个分区由一个任务处理。并行度越高,意味着同时处理的分区越多,作业的执行速度可能越快。然而,并行度设置过高或过低都会影响性能。

  • 并行度过低:可能导致资源闲置,作业执行时间变长。
  • 并行度过高:可能导致任务调度开销增加,甚至引发资源竞争。

如何设置并行度?

在 Spark 中,并行度可以通过以下几种方式设置:

1. 通过 spark.default.parallelism 参数设置

spark.default.parallelism 是 Spark 的全局默认并行度参数。它决定了 RDD 转换操作(如 reduceByKeyjoin 等)的默认分区数。

scala
val conf = new SparkConf()
.setAppName("ParallelismExample")
.setMaster("local[*]")
.set("spark.default.parallelism", "100")

val sc = new SparkContext(conf)
提示

对于集群模式,通常建议将 spark.default.parallelism 设置为集群中所有 CPU 核心数的 2-3 倍。

2. 通过 repartitioncoalesce 方法调整分区数

repartitioncoalesce 是 Spark 提供的两种调整分区数的方法。

  • repartition:增加或减少分区数,会触发全量数据洗牌(Shuffle)。
  • coalesce:仅减少分区数,不会触发全量数据洗牌。
scala
val rdd = sc.parallelize(1 to 1000, 10) // 初始分区数为 10
val repartitionedRDD = rdd.repartition(20) // 增加分区数到 20
val coalescedRDD = rdd.coalesce(5) // 减少分区数到 5

3. 通过 spark.sql.shuffle.partitions 设置 SQL 操作的并行度

在 Spark SQL 中,spark.sql.shuffle.partitions 参数决定了 Shuffle 操作的分区数。默认值为 200。

scala
spark.conf.set("spark.sql.shuffle.partitions", "100")
警告

如果数据量较小,设置过高的 spark.sql.shuffle.partitions 可能导致大量小文件,影响性能。

实际案例

假设我们有一个包含 100 万条日志记录的数据集,每条记录包含用户 ID 和访问时间。我们的目标是统计每个用户的访问次数。

1. 初始设置

scala
val logs = sc.textFile("hdfs://path/to/logs")
val userLogs = logs.map(line => (line.split(",")(0), 1))
val userCounts = userLogs.reduceByKey(_ + _)

默认情况下,reduceByKey 的分区数由 spark.default.parallelism 决定。如果未设置,分区数可能过少,导致性能瓶颈。

2. 优化并行度

通过调整 spark.default.parallelismrepartition,我们可以优化作业性能。

scala
val conf = new SparkConf()
.setAppName("UserLogAnalysis")
.setMaster("local[*]")
.set("spark.default.parallelism", "200")

val sc = new SparkContext(conf)

val logs = sc.textFile("hdfs://path/to/logs")
val userLogs = logs.map(line => (line.split(",")(0), 1))
val repartitionedLogs = userLogs.repartition(200)
val userCounts = repartitionedLogs.reduceByKey(_ + _)

通过增加分区数,作业的执行时间显著减少。

总结

正确设置并行度是 Spark 性能优化的关键步骤之一。通过调整 spark.default.parallelismrepartitionspark.sql.shuffle.partitions 等参数,可以显著提高作业的执行效率。在实际应用中,建议根据数据量和集群资源动态调整并行度。

附加资源

练习

  1. 在一个包含 1000 万条记录的数据集上,尝试调整 spark.default.parallelism 并观察作业执行时间的变化。
  2. 使用 repartitioncoalesce 方法,比较它们对分区数和作业性能的影响。