微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spark Shuffle

Spark Shuffle

shuffle的分区数怎么确定?

  • Map端由初始读取的数据源和算子计算后决定,比如:kafka的分区数
  • Reduce端由spark.default.parallelism决定,如果没有配置,由上一个map的分区数一致

Reduce怎么拉取数据?

先上图:

在这里插入图片描述

  • BlockManager 是传输数据
  • MapoutputTracker是通讯

流程:

  • MapOutputTrackerWorker将自己的map端输出文件位置信息通过包装为MapStatus发送给Driver端的MapoutputTrackerMaster
  • Reduce端的在拉取数据前,先会在本地找,找不到向MapoutTrackerMaster获取
  • 通过BlockManager的BlockTransforService传输数据

每次认5个线程,拉取48M

两种分发文件的方式(即shuffle的方式)

HashShuffle

在这里插入图片描述

顾名思义:

它是利用hash算法将相同的key的数据分发到同一个文件

shuffle read的拉取过程是一边拉取一边进行聚合的

产生的文件

由reduce的任务数 * map端的任务数

缺陷

会产生大量的文件增加磁盘IO的开销,如果reduce端100个,map 50个,那产生了5000个文件

优化后的HashShuffle

在这里插入图片描述

开启合并文件的参数:

spark.shuffle. consolidateFiles

它允许在第一次并行执行的任务写入的文件加上一个shuffleGroupFile的概念来合并文件,后面的每一次执行的任务都可以复用相同的文件。当然也是要遵循hash的原则

产生的文件

excutor*excutor core * reduce tasks(即相同的excuto上map端输出的对应的reduce文件会复用)

以上两种方式已经是过去时了,现在的版本不推荐使用,认是sortbaseshuffle.

如果reduce端100个,map 50个,每个executor单核,10个executor 每个执行 5个任务,

产生的文件数1000个文件

SortShuffle

在这里插入图片描述

流程:

  • 首先根据算子选择数据结构,比如:聚合使用map,join使用array
  • 然后向数据结构中写入数据,当达到一定的阈值的时候,溢写到磁盘
  • 排好序后溢写磁盘的过程中使用Java的BufferOutputStream进行缓冲写入文件
  • 合并每一次溢写的文件
  • 再合并同一个任务的所有文件这里有没有排序不知道,也就是说可能是不保证全局排序,只是每一个文件的排序而已
  • 并且写一个索引文件告诉reduce从哪里开始是你要拉取数据

文件

Map端Tasks

所以这种模式下就一个任务一个文件,即就是任务数。

如果reduce端100个,map 50个,每个executor单核,10个executor 每个执行 5个任务,

50个

从 5000个 到1000个 再到 50个,整整提高90%的效率

跟hash shuffle的合并模式比较,主要就是此种模式不受excutor core的影响

特殊模式(bypass)

以上是普通模式,还有一种省略了中排序的模式,分发文件时使用Hash,最后来合并文件

文件数也是一样的,但是它不能使用在聚合算子上。

bypass运行机制的触发条件如下:

  • shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。

  • 不是聚合类的shuffle算子。

BlockManager 存储管理

在这里插入图片描述

  • BlockManaerMaster对子节点的元数据管理

  • BlockManagerWorker发生了数据的增删改就会通知BlockmangerMaster

  • 使用BlockTransferService传输数据

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐