Spark Shuffle
shuffle的分区数怎么确定?
Reduce怎么拉取数据?
先上图:
- BlockManager 是传输数据
- MapoutputTracker是通讯
流程:
- MapOutputTrackerWorker将自己的map端输出的文件位置信息通过包装为MapStatus发送给Driver端的MapoutputTrackerMaster
- Reduce端的在拉取数据前,先会在本地找,找不到向MapoutTrackerMaster获取
- 通过BlockManager的BlockTransforService传输数据
每次默认5个线程,拉取48M
两种分发文件的方式(即shuffle的方式)
HashShuffle
顾名思义:
shuffle read的拉取过程是一边拉取一边进行聚合的
产生的文件数
由reduce的任务数 * map端的任务数
缺陷
会产生大量的文件,增加磁盘IO的开销,如果reduce端100个,map 50个,那产生了5000个文件
优化后的HashShuffle
开启合并文件的参数:
spark.shuffle. consolidateFiles
它允许在第一次并行执行的任务写入的文件上加上一个shuffleGroupFile的概念来合并文件,后面的每一次执行的任务都可以复用相同的文件。当然也是要遵循hash的原则
产生的文件数
excutor*excutor core * reduce tasks(即相同的excuto上map端输出的对应的reduce文件会复用)
以上两种方式已经是过去时了,现在的版本不推荐使用,默认是sortbaseshuffle.
如果reduce端100个,map 50个,每个executor单核,10个executor 每个执行 5个任务,
SortShuffle
流程:
- 首先根据算子选择数据结构,比如:聚合使用map,join使用array
- 然后向数据结构中写入数据,当达到一定的阈值的时候,溢写到磁盘
- 排好序后溢写磁盘的过程中使用Java的BufferOutputStream进行缓冲写入文件
- 合并每一次溢写的文件
- 再合并同一个任务的所有文件(这里有没有排序不知道,也就是说可能是不保证全局排序,只是每一个文件的排序而已)
- 并且写一个索引文件告诉reduce从哪里开始是你要拉取数据
文件数
Map端Tasks
如果reduce端100个,map 50个,每个executor单核,10个executor 每个执行 5个任务,
50个
从 5000个 到1000个 再到 50个,整整提高90%的效率
(跟hash shuffle的合并模式比较,主要就是此种模式不受excutor core的影响)
特殊模式(bypass)
以上是普通模式,还有一种省略了中排序的模式,分发文件时使用Hash,最后来合并文件。
文件数也是一样的,但是它不能使用在聚合算子上。
bypass运行机制的触发条件如下:
BlockManager 存储管理
-
BlockManaerMaster对子节点的元数据管理
-
使用BlockTransferService传输数据
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。