前言: spark/mr作业在执行过程中,数据重排的过程,主要发生在mr的话,就在map输出和reduce输入的过程,如果在spark就发生在窄依赖阶段和宽依赖阶段。
shuffle操作是分布式计算不可避免的一个过程,同时也是分布式计算最消耗性能的一个部分。
一、spark shuffle发展和执行过程
在spark中由于不同的ShuffleManager的的配置,会造成shuffle执行的流程不一样,spark发展至今,shuffle经历了如下三个阶段:
1)未经优化hashShuffleManager(1.2之前版本默认)
2)经过优化的hashShuffleManager—>shuffleGroup 0.8出现
3)SortShuffleManager(1.2以后版本成为默认)
到底应该使用哪一个shuffleManager来执行shuffle呢,其实可以通过spark参数在sparkconf或者spark-sbumit脚本进行控制:spark.shuffle.manager=hash/sort(sort为默认)
1、未经优化HashShuffleManager
2、经过优化的HashShuffleManager——shuffleGroup
3、SortShuffleManager
3.1普通的SortShuffleManager
其中最后的多个磁盘文件会合并成一个磁盘文件,这样会减少网络IO的次数,提高执行效率,为了表示磁盘文件中的数据属于哪一个resultTask,那么在写入的时候将对应的元数据信息写入到索引文件中,主要包含的就是一个Tuple3(partition, offset, length),其中partition就指定了这个segment数据片段属于哪一个下游的resultTask,offset和length决定这个segment数据数据内容是哪些。
3.2基于ByPass机制的SortShuffleManager
上述普通的SortShuffleManager虽然效率很高,但是相比较于HashShufflemanager多了排序阶段,如果我们的shuffle操作不需要进行排序,反而这个操作会降低执行的效率。
所谓为了有选择的在shuffle时候进行排序或者不进行排序,那么我们就可以开启bypass机制,来控制在shuffle的时候不进行排序。执行过程如下图:
如何去开启bypass机制:
spark.shuffle.sort.bypassMergeThreshold,默认值200来指定是否进行bypass机制。
也就是说如果并行度是200以下的话不会进行排序,spark.default.parallelism控制全局并行度。所以如果我们不想在shuffle中进行排序的话,那么我们应该尽可能将spark.shuffle.sort.bypassMergeThreshold值调大一点。
二、Spark shuffle常见的优化参数
优化参数 | 默认值 | 优化参数用法 |
---|---|---|
spark.reducer.maxSizeInFlight | 48M | reduce read的时候每次从map端读取的最大的数据量,也就是reduce端shuffle read的缓冲区的大小。所以如果shufflemapTask生成的数据量很多,reduce的内存又有保障,通过调整这个参数,比如98M,可以减少网络拉取的次数,这样在一定程度上能提高效率。 |
spark.shuffle.compress | true | 在shuffle过程中往磁盘写数据的过程中开启压缩操作,较少数据体积。对应的压缩算法通过参数spark.io.compression.codec来执行,默认为lz4 |
spark.shuffle.file.buffer | 32K | shuffle write过程中往磁盘文件中写的那个bufferedoutputstream的缓冲区大小,默认32k,我们可以调大这个参数,比如64k,96k等等。 |
spark.shuffle.io.maxRetries | 3 | 表示的是shuffle read如果失败,从shuffle write的文件拉取的最大重试次数。如果网络不稳定,我们为了避免失败,可以调大这个参数的值,比如10次。 |
spark.shuffle.io.retryWait | 5s | 在两次重试之间,不是立马执行,需要一定的等待时间,这个retryWait配置这个间隔时间。所以网络不稳定,我们可以调大这个参数的值,让程序正常执行,比如30s, 60s。 |
spark.shuffle.sort.bypassMergeThreshold | 200 | 是否在shuffle过程中进行排序,如果不希望进行排序,建议调大该值。 |
spark.shuffle.memoryFraction | 0.2 | 用于reduceTask存储拉取过来的数据,进行聚合操作的executor的内存比例。如果持久化操作较少,而shuffle操作较多,可以调大该比例,比如0.3. |
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。