微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

spark性能调优四之shuffle

前言: spark/mr作业在执行过程中,数据重排的过程,主要发生在mr的话,就在map输出和reduce输入的过程,如果在spark就发生在窄依赖阶段和宽依赖阶段。
shuffle操作是分布式计算不可避免的一个过程,同时也是分布式计算最消耗性能一个部分。

一、spark shuffle发展和执行过程

在spark中由于不同的ShuffleManager的的配置,会造成shuffle执行的流程不一样,spark发展至今,shuffle经历了如下三个阶段:

1)未经优化hashShuffleManager(1.2之前版本认)

2)经过优化的hashShuffleManager—>shuffleGroup 0.8出现

3)SortShuffleManager(1.2以后版本成为认)

到底应该使用哪一个shuffleManager来执行shuffle呢,其实可以通过spark参数在sparkconf或者spark-sbumit脚本进行控制:spark.shuffle.manager=hash/sort(sort为认)

1、未经优化HashShuffleManager

在这里插入图片描述

2、经过优化的HashShuffleManager——shuffleGroup

在这里插入图片描述

3、SortShuffleManager

3.1普通的SortShuffleManager

在这里插入图片描述


其中最后的多个磁盘文件会合并成一个磁盘文件,这样会减少网络IO的次数,提高执行效率,为了表示磁盘文件中的数据属于哪一个resultTask,那么在写入的时候将对应的元数据信息写入到索引文件中,主要包含的就是一个Tuple3(partition, offset, length),其中partition就指定了这个segment数据片段属于哪一个下游的resultTask,offset和length决定这个segment数据数据内容是哪些。

3.2基于ByPass机制的SortShuffleManager

上述普通的SortShuffleManager虽然效率很高,但是相比较于HashShufflemanager多了排序阶段,如果我们的shuffle操作不需要进行排序,反而这个操作会降低执行的效率。

​所谓为了有选择的在shuffle时候进行排序或者不进行排序,那么我们就可以开启bypass机制,来控制在shuffle的时候不进行排序。执行过程如下图:

在这里插入图片描述


如何去开启bypass机制:
spark.shuffle.sort.bypassMergeThreshold,认值200来指定是否进行bypass机制。

​也就是说如果并行度是200以下的话不会进行排序,spark.default.parallelism控制全局并行度。所以如果我们不想在shuffle中进行排序的话,那么我们应该尽可能将spark.shuffle.sort.bypassMergeThreshold值调大一点。

二、Spark shuffle常见的优化参数

优化参数 认值 优化参数用法
spark.reducer.maxSizeInFlight 48M reduce read的时候每次从map端读取的最大的数据量,也就是reduce端shuffle read的缓冲区的大小。所以如果shufflemapTask生成的数据量很多,reduce的内存又有保障,通过调整这个参数,比如98M,可以减少网络拉取的次数,这样在一定程度上能提高效率。
spark.shuffle.compress true 在shuffle过程中往磁盘写数据的过程中开启压缩操作,较少数据体积。对应的压缩算法通过参数spark.io.compression.codec来执行,认为lz4
spark.shuffle.file.buffer 32K shuffle write过程中往磁盘文件中写的那个bufferedoutputstream的缓冲区大小,认32k,我们可以调大这个参数,比如64k,96k等等。
spark.shuffle.io.maxRetries 3 表示的是shuffle read如果失败,从shuffle write的文件拉取的最大重试次数。如果网络不稳定,我们为了避免失败,可以调大这个参数的值,比如10次。
spark.shuffle.io.retryWait 5s 在两次重试之间,不是立马执行,需要一定的等待时间,这个retryWait配置这个间隔时间。所以网络不稳定,我们可以调大这个参数的值,让程序正常执行,比如30s, 60s。
spark.shuffle.sort.bypassMergeThreshold 200 是否在shuffle过程中进行排序,如果不希望进行排序,建议调大该值。
spark.shuffle.memoryFraction 0.2 用于reduceTask存储拉取过来的数据,进行聚合操作的executor的内存比例。如果持久化操作较少,而shuffle操作较多,可以调大该比例,比如0.3.

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐