原文链接:
https://0x0fff.com/spark-architecture-shuffle
如上图所示,橙色箭头表示shuffle阶段,箭头的起始端称为mapper端,箭头结束端称为reducer。
在Spark中,有多种shuffle的实现,它取决于参数spark.shuffle.manager。一共有三个选项:hash, sort, tungsten-sort,其中从Spark 1.2.0开始sort是默认选项。
Hash Shuffle
在Spark 1.2.0前,默认的shuflle实现是hash (spark.shuffle.manager = hash). 不过它有很多缺陷,大部分的理由都是它创建了太多文件,每一个mapper任务都会为每一个reducer创建一个文件,最终会在集群上生成M*R个文件,这里M是mapper的数量以及R是reducer的数量。大量的mapper和reducer会造成大问题,包括:输出缓冲区,文件系统中打开文件的数量,创建和删除文件的速度等。有一个好的例子是来自于Yahoo,他们遇到了前面提及的所有问题: 46k个mapper与46k个reducer导致集群里创建了20亿个文件。
这个shuffle实现的逻辑非常简单: 它将reducer的个数视作reducer端的分区数, 然后每个mapper端会为这些reducer分区创建一个独立的文件,然后轮询它需要输出的record,它会计算每一个record隶属于哪个分区,然后将record写入写入对应的文件中。
这个shuffle实现的工作机制如下图:
这个shuffle实现有一个优化的版本,开关参数为spark.shuffle.consolidateFiles(默认是false)。当这个参数被设置为true时,mapper的输出文件会被合并(打开文件的数量会因此减少)。
如果你有E个executor(“–num-executors” for YARN)以及每个executor拥有C个core(“spark.executor.cores” or “–executor-cores” for YARN),然后每个任务需要T个core(“spark.task.cpus“),那么集群里execution slots的数量为E * C / T,然后shuffle期间创建的文件的数量为E * C / T * R。比如当E为100,C为10,T为1,一共有46k个reducer,集群中创建的文件数量将从20亿下架到4600万,从性能的角度看,这是有了很大的提升。这个特性实现起来是非常简单的:创建一个输出文件池,而不是为每个reducer创建一个新的文件。当map任务开始输出数据时,它会从池中获取R个文件。当它写数据结束后,它会返还这R个文件到池里。因为每个executor能并行执行C/T个任务,它会创建C/T组输出文件,每组文件的数量为R。在第一组C/T的并行map任务结束后,后面的map任务会重用已经存在在池中的文件组。
优化过的shuffle实现的工作机制如下图:
优点:
缺点:
Sort Shuffle
从Spark 1.2.0开始, 它使用的shuffle算法为sort (spark.shuffle.manager = sort). sort算法尝试使用Hadoop MapReduce的shuffle逻辑。基于hash shuffle你为每个“reducers”输出一个文件, 而基于sort shuffle使用了更聪明的办法: 你会输出一个文件,文件中的项会以reducer id,并且会索引这些reducer id,这种方法可以让你轻易地通过索引找到属于reducer x的数据块。
Sort Shuffle的工作机制如下图:
优点:
缺点:
- 排序比hashing要慢;
- 如果你使用的是SSD,hash shuffle会更适合你。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。