微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

spark_core_03

package com.atguigu.bigata.spark.core.rdd.builder.operator.action

import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

/**
 * @auther :atom
 * @date :2022/2/20 20:40
 * wordCount的九种写法
 */
object Spark01_RDD_Operator_Action {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("action")
    val sc = new SparkContext(conf)


    //    wc1(sc)
    //    wc2(sc)
    //    wc3(sc)
    //    wc5(sc)

    //    wc6(sc)
    //    wc7(sc)
    //    wc8(sc)
    //    wc9(sc)

 /*   aggregateByKey(sc)
    combineByKey(sc)*/
    wc4(sc)
    sc.stop()

  }

  //Todo reduceByKey
  def wc1(sc: SparkContext): Unit = {
    val RDD = sc.parallelize(List("hello kafka", "hello spark", "hello hbase", "hello flink"), 2)
    RDD.flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .collect
      .foreach(println)
  }

  //Todo GroupBy
  def wc2(sc: SparkContext): Unit = {
    val RDD = sc.parallelize(List("hello kafka", "hello spark", "hello hbase", "hello flink"), 2)
    RDD.flatMap(_.split(" "))
      .groupBy(word => word)
      .mapValues(_.size)
      .collect
      .foreach(println)
  }

  //Todo GroupByKey
  def wc3(sc: SparkContext): Unit = {
    val RDD = sc.parallelize(List("hello kafka", "hello spark", "hello hbase", "hello flink"), 2)
    RDD.flatMap(_.split(" "))
      .map((_, 1))
      .groupByKey()
      .map(
        line => {
          (line._1, line._2.size)
        }
      ).collect
      .foreach(println)

  }

  //Todo reduce
  def wc4(sc: SparkContext): Unit = {
    val RDD = sc.parallelize(List("hello kafka", "hello spark", "hello hbase", "hello flink"), 2)
    println(RDD.flatMap(_.split(" ")).map(line => (mutable.Map[String, Long]((line, 1)))).reduce {
      (map1, map2) => {
        map2.foreach {
          case (word, count) => {
            val newcount = map1.getorElse(word, 0L) + count
            map1.update(word, newcount)
          }
        }
        map1
      }
    })
  }

  def wc5(sc: SparkContext): Unit = {
    val RDD = sc.parallelize(List("hello kafka", "hello spark", "hello hbase", "hello flink"), 2)
    RDD.flatMap(_.split(" "))
      .countByValue()
      .foreach(println)

  }

  def wc6(sc: SparkContext): Unit = {
    val RDD = sc.parallelize(List("hello kafka", "hello spark", "hello hbase", "hello flink"), 2)
    RDD.flatMap(_.split(" "))
      .map((_, 1))
      .countByKey()
      .foreach(println)
  }

  def wc7(sc: SparkContext): Unit = {
    val RDD = sc.parallelize(List("hello kafka", "hello spark", "hello hbase", "hello flink"), 2)
    RDD.flatMap(_.split(" ")).map((_, 1)).combineByKey(
      line => line,
      (a: Int, b: Int) => a + b,
      (a: Int, b: Int) => a + b
    ).collect
      .foreach(println)
  }

  //Todo aggregateByKey  平均值
  def aggregateByKey(sc: SparkContext): Unit = {
    val RDD = sc.parallelize(List(("a", 1), ("a", 2), ("b", 3), ("b", 4), ("c", 5)), 2)
    RDD.aggregateByKey((0, 0))(
      (tuple, cnt) => {
        (tuple._1 + cnt, tuple._2 + 1)
      },
      (t1, t2) => {
        (t1._1 + t2._1, t2._2 + t2._2)
      }
    ).mapValues {
      case (num, cnt) => {
        num / cnt
      }
    }.collect.foreach(println)
  }

  //Todo combinByKey
  def combineByKey(sc: SparkContext): Unit = {
    val RDD = sc.parallelize(List(("a", 1), ("a", 2), ("b", 3), ("b", 4), ("c", 5)), 2)
    RDD.combineByKey(
      v => (v, 1),
      (tuple: (Int, Int), v) => {
        (tuple._1 + v, tuple._2 + 1)
      },
      (t1: (Int, Int), t2: (Int, Int)) => {
        (t1._1 + t2._1, t1._2 + t2._2)
      }
    ).mapValues {
      case (num, cnt) => {
        num / cnt
      }
    }.collect
      .foreach(println)
  }


  def wc8(sc: SparkContext): Unit = {
    val RDD = sc.parallelize(List("hello kafka", "hello spark", "hello hbase", "hello flink"), 2)
    RDD.flatMap(_.split(" "))
      .map((_, 1))
      .foldByKey(0)(_ + _)
      .collect
      .foreach(println)

  }

  def wc9(sc: SparkContext): Unit = {
    val RDD = sc.parallelize(List("hello kafka", "hello spark", "hello hbase", "hello flink"), 2)
    RDD.flatMap(_.split(" "))
      .map((_, 1))
      .aggregateByKey(0)(_ + _, _ + _)
      .collect
      .foreach(println)
  }


}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐