微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spark 源码系列 - 获取分区数

目录

版本

本分析基于Spark version 3.1.2

结论

local模式下,认最小分区数不会超过2

  • 如果对spark.default.parallelism属性赋值 && 值>=2,则分区数为2.
  • 其他情形分区数为1.

代码入口

val conf = new SparkConf()
conf.setAppName("my-spark-01")
conf.setMaster("local")
// 取消如下注释,分区数为2;否则分区数为1
// conf.set("spark.default.parallelism", "3")

val sc = new SparkContext(conf)
val lines = sc.textFile("./data/words")

SparkContext -> textFile

def textFile(
    path: String,
    minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {...}

SparkContext -> defaultMinPartitions

// 认最小分区数不会超过2
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

SparkContext -> defaultParallelism

def defaultParallelism: Int = {
  ...
  taskScheduler.defaultParallelism
}

TaskSchedulerImpl

override def defaultParallelism(): Int = backend.defaultParallelism()

LocalSchedulerBackend

// totalCores在系统初始化时赋值为1
// 如果在程序启动时没有设置spark.default.parallelism的值,那么该方法返回值是1
override def defaultParallelism(): Int =
  scheduler.conf.getInt("spark.default.parallelism", totalCores)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐