微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

spark通过随机加前缀解决数据倾斜问题


package day03

import org.apache.spark.sql.{DataFrame, SparkSession}

//解决wordcount的数据倾斜问题
object _01DataSkewDemo01 {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder().master("local[2]").appName("test").getorCreate()
   import spark.implicits._

    val list: List[String] = List("a a b b c c a a c a a a a a a a a a a","d a c b a a a e e c a a a a","a a b a a")

    //变成df
    val df: DataFrame = list.toDF("line")
    //1.临时表
    df.createTempView("temp")
    //2.按照分组统计每个单词的数量,结果发现,某个单词的数量过多(下面的a有26个,而d只有1个),
    //所以我们要处理数据倾斜问题
    val sql=
      """
        |select word,count(1)
        |from
        |(select
        |explode(split(line," ")) word
        |from temp)A
        |group by word
        |""".stripMargin

    //spark.sql(sql).show(80)

    /**结果:
     * |word|count(1)|
     * +----+--------+
     * |   e|       2|
     * |   d|       1|
     * |   c|       5|
     * |   b|       4|
     * |   a|      26|
     * |    |       1|
     * +----+--------+
     */
    //处理数据倾斜 ;前缀这里取0,1,2//rand() 是[0,1)之间的小数; 所以,rand()*3=>是[0,3)之间的小数,然后再向下取整就得到了 0 1 2
    println("----------1-1---随机数字0~2,给每个单词加前缀-----------------------")
    val presql=
      """
        |select word,concat(floor(rand()*3),"_",word) pre_word
        |from
        |(select explode(split(line," ")) word
        |from temp)A
        |""".stripMargin

    //spark.sql(presql).show()

    /**
     * 结果:
     * |word|pre_word|
     * +----+--------+
     * |   e|     2_e|
     * |   d|     2_d|
     * |   c|     1_c|
     * |   b|     0_b|
     * |   a|     0_a|.....
     * +----+--------+
     */

    println("----------1-2---局部聚合,按前缀和单词相同的进行分组-----------------------")
    //注意:sparksql中,group by中可以用select中的变量名
    val partsql=
      """
        |select pre_word,count(1) pre_count
        |from
        |(select word,concat(floor(rand()*3),"_",word) pre_word
        |from
        |(select explode(split(line," ")) word
        |from temp)A)B
        |group by pre_word
        |""".stripMargin

     //spark.sql(partsql).show()

    println("----------1-2---去掉前缀,全局聚合-----------------------")
    val fullsql=
      """
        |select substr(pre_word,instr(pre_word,"_")+1) word,sum(pre_count) count
        |from
        |(select pre_word,count(1) pre_count
        |from
        |(select word,concat(floor(rand()*3),"_",word) pre_word
        |from
        |(select explode(split(line," ")) word
        |from temp)A)B
        |group by pre_word)C
        |group by word
        |""".stripMargin
    spark.sql(fullsql).show()

    /**
     * 最终结果:这样一样得到相同的结果,但是实际上它已经对数据倾斜做了处理,这样查询速度就会快很多了
     * |word|count|
     * +----+-----+
     * |   e|    2|
     * |   d|    1|
     * |   c|    5|
     * |   b|    4|
     * |   a|   26|
     * +----+-----+
     */
    spark.stop()

  }

}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐