Spark2.x中三种ShuffleWriter触发条件分别是什么,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。
一、概述
基于Sort的Shuffle Writer有三种实现:BypassMergeSortShuffleWriter、UnsafeShuffleWriter、SortShuffleWriter,下面我们主要看看在Spark2.x是如何选择使用哪种Shuffler Writer处理的。
二、Shuffle触发条件
1.我们结合源码来看ShuffleManager是如何来决定选择实例化哪一种ShuffleHandle,如果注册的是SerializedShuffleHandle,就获取UnsafeShuffleWriter;如果注册的是BypassMergeSortShuffleHandle,就获取BypassMergeSortShuffleWriter;如果注册的是BaseShuffleHandle,就获取SortShuffleWriter,registerShuffle()函数代码如下:
override def registerShuffle[K, V, C]( shuffleId: Int, numMaps: Int, dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { /*先判断是否满足BypassMergeSortShuffleHandle条件*/ if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) { new BypassMergeSortShuffleHandle[K, V]( shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) /*再判断是否满足SerializedShuffleHandle条件*/ } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) { // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient: new SerializedShuffleHandle[K, V]( shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) /*最后才是BaseShuffleHandle*/ } else { // Otherwise, buffer map outputs in a deserialized form: new BaseShuffleHandle(shuffleId, numMaps, dependency) } }
2.上面registerShuffle()函数的各个策略的判断条件,这里详细说一下:
1).BypassMergeSortShuffleWriter(对应BypassMergeSortHandle)的触发条件如下:
a.map端不能进行aggregation聚合操作
b.不能指定ordering,即分区内数据不能排序
c.分区的数目必须小于 spark.shuffle.sort.bypassMergeThrshold指定的阀值,默认值是200
2).UnsafeShuffleWriter触发条件(对应SerializedShuffleHandle)的触发条件如下:
a.shuffle依赖不带有聚合操作
b.支持序列化值的重新定位
c.分区数量少于16777216个
3).sortShuffleWriter(对应BaseShuffleHandle)
上面如果不满足,默认是这个策略,用于处理所有其他情况。
关于Spark2.x中三种ShuffleWriter触发条件分别是什么问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注编程之家行业资讯频道了解更多相关知识。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。