说明
- RDD中的元素按照key指定的分区规则进行分区。
- RDD中的元素必须是键值对类型。
- 如果原有的partitionRDD和现有的partitionRDD一致的话就不进行分区,否则会发生shuffle。
函数签名
代码示例(默认分区器)
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]")
val sc = new SparkContext(conf)
// 按照key分区,所以数据必须是k-v键值对类型
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3), ("d", 4)), 2)
println("-------------------重新分区前--------------------")
rdd.mapPartitionsWithIndex {
(index, datas) => {
println(index + "--->" + datas.mkString(","))
datas
}
}.collect()
println("-------------------重新分区后--------------------")
//按照哈希值进行分区
val newRDD: RDD[(String, Int)] = rdd.partitionBy(new HashPartitioner(3))
newRDD.mapPartitionsWithIndex {
(index, datas) => {
println(index + "--->" + datas.mkString(","))
datas
}
}.collect()
sc.stop()
代码实现(自定义分区器)
class MyPartitioner(partitions: Int) extends Partitioner {
// 分区数量
override def numPartitions: Int = partitions
// 分区逻辑 返回值为分区编号
override def getPartition(key: Any): Int = {
// 将key的类型转为String类型
val k: String = key.asInstanceOf[String]
if (k.startsWith("136")) 0
else if (k.startsWith("137")) 1
else if (k.startsWith("138")) 2
else 3
}
}
// 调用自定义分区器
val rdd: RDD[(String, String)] = sc.makeRDD(List(("13698624174", "河北"), ("13766887551", "广东"),
("13876543211", "上海"), ("17677885551", "河南")), 2)
println("-------------------重新分区前--------------------")
rdd.mapPartitionsWithIndex {
(index, datas) => {
println(index + "--->" + datas.mkString(","))
datas
}
}.collect()
println("-------------------重新分区后--------------------")
val newRDD: RDD[(String, String)] = rdd.partitionBy(new MyPartitioner(4))
newRDD.mapPartitionsWithIndex {
(index, datas) => {
println(index + "--->" + datas.mkString(","))
datas
}
}.collect()
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。