微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spark RDD 分区

Spark RDD 分区

tag: Spark, Spark Partitioner, Spark Repartition

2021-04-2513:36:44 星期六
version: spark-2.4.5

分区器

自定义key分发的逻辑仅在 RDD 级别适用。

  1. Partitioner
    自定义分区器

    abstract class Partitioner extends Serializable {
        abstract def getPartition(key: Any): Int // 返回值类似于数组Index
    	abstract def numPartitions: Int
    }
    
  2. HashPartitioner
    自带Hash分区器, 分区ID: key.hashCode % numPartitions 负数则加Mod否则返回

    class HashPartitioner extends Partitioner{ new HashPartitioner(partitions: Int) }
    
  3. RangePartitioner
    相比HashPartitioner,RangePartitioner分区会尽量保证每个分区中数据量的均匀, 要求Key可比较.
    将分区数据分成块, 用鱼塘抽样对块计算(主要是为了得到尽量多的值 与其count) 之后就是选分隔符, 就跟HBase的Region的范围似的

    class RangePartitioner[K, V] extends Partitioner
    

重分区算子

  1. coalesce
    返回numPartitions个分区的新RDD, 当shuffle = false时, 这是一个 narrow dependency 算子性能较好,
    一般用来减少分区数, 比如从 100 -> 10(最好不少于Executor个数)

    def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null): RDD[T]
    
  2. repartition
    带有Shuffle的Repartition, 可以任意调节分区数.

    /** Return a new RDD that has exactly numPartitions partitions. */
    def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
    
  3. repartitionAndSortWithinPartitions
    返回按照Partitioner给出的Key重分区并顺序排序后的RDD, 利用ShuffleSortManager实现, 相比于 repartition + sortByKey 性能更好.
    即相当于 sortByKey -> exchange -> merge

    /**
     * Repartition the RDD according to the given partitioner and, within each resulting
     * partition, sort records by their keys.
     * This is more efficient than calling repartition and then sorting within each
     * partition because it can push the sorting down into the shuffle machinery.
    */
    def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)]
    

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐