微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spark ==》 发送接收json数据并分析

题目一:以下为学生期末考试的部分数据,请按要求完成统计格式如下@H_404_12@{"name":"zhangsan","sex":"m",kemu:yuwen,"score":66}

1) 创建kafka主题ods_score_topic,要求一个备份,一个分区@H_404_12@

2) 创建生产者,往主题添加15条以上数据@H_404_12@

3) 创建maven项目@H_404_12@

4) 导入sparkstreaming依赖@H_404_12@

5) 创建sparkconf环境@H_404_12@

6) 设置批时间5S@H_404_12@

7) 设置日志等级味error@H_404_12@

8) 获取kafka数据源@H_404_12@

9) 解析json数据,返回4元组格式数据@H_404_12@

10) 遍历元组计算每个学生的总成绩@H_404_12@

11) 遍历元组计算每个科目的最高分@H_404_12@

遍历元组计算每个科目的平均分@H_404_12@

 

生产者

package com.lq.scala

import com.alibaba.fastjson.JSON
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import java.util.Properties
import scala.beans.BeanProperty
@H_404_12@// @BeanProperty  自动生成set get
@H_404_12@
case class Stu(@BeanProperty name:String,@BeanProperty sex:String,@BeanProperty kemu:String,@BeanProperty score:Int)

object KafkaProducerTest {
  def main(args: Array[String]): Unit = {
    val props = new Properties()
    props.put("bootstrap.servers", "hdp1:9092,hdp2:9092,hdp3:9092")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    val producer: KafkaProducer[String,String] = new KafkaProducer[String,String](props)

    //2)创建生产者,往主题添加15条以上数据
    producer.send(new ProducerRecord[String, String]("ods_score_topic","",JSON.toJSON(Stu("张三", "m", "yuwen", 66)).toString))
    producer.send(new ProducerRecord[String, String]("ods_score_topic","",JSON.toJSON(new Stu("张三","m","yingyu",77)).toString))
    producer.send(new ProducerRecord[String, String]("ods_score_topic","",JSON.toJSON(new Stu("张三","m","shuxue",88)).toString))
    producer.send(new ProducerRecord[String, String]("ods_score_topic","",JSON.toJSON(new Stu("李四","m","yuwen",80)).toString))
    producer.send(new ProducerRecord[String, String]("ods_score_topic","",JSON.toJSON(new Stu("李四","m","yingyu",90)).toString))
    producer.send(new ProducerRecord[String, String]("ods_score_topic","",JSON.toJSON(new Stu("李四","m","shuxue",100)).toString))
    producer.send(new ProducerRecord[String, String]("ods_score_topic","",JSON.toJSON(new Stu("王五","n","yuwen",100)).toString))
    producer.send(new ProducerRecord[String, String]("ods_score_topic","",JSON.toJSON(new Stu("王五","n","yingyu",80)).toString))
    producer.send(new ProducerRecord[String, String]("ods_score_topic","",JSON.toJSON(new Stu("王五","n","shuxue",70)).toString))
    producer.send(new ProducerRecord[String, String]("ods_score_topic","",JSON.toJSON(new Stu("赵六","n","yuwen",80)).toString))
    producer.send(new ProducerRecord[String, String]("ods_score_topic","",JSON.toJSON(new Stu("赵六","n","yingyu",80)).toString))
    producer.send(new ProducerRecord[String, String]("ods_score_topic","",JSON.toJSON(new Stu("赵六","n","shuxue",80)).toString))
    producer.send(new ProducerRecord[String, String]("ods_score_topic","",JSON.toJSON(new Stu("郑七","m","yuwen",95)).toString))
    producer.send(new ProducerRecord[String, String]("ods_score_topic","",JSON.toJSON(new Stu("郑七","m","yingyu",85)).toString))
    producer.send(new ProducerRecord[String, String]("ods_score_topic","",JSON.toJSON(new Stu("郑七","m","shuxue",75)).toString))

    producer.close()
  }
}

 

 

消费者

package com.lq.scala

import com.alibaba.fastjson.JSON
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

object KafkaConsumerTest {
  def main(args: Array[String]): Unit = {
    //5)创建sparkconf环境  setMaster 设置线程数量
    val conf = new SparkConf().setMaster("local[*]").setAppName("week1")
    //6)设置批时间5S
    val ssc = new StreamingContext(conf, Seconds(5))
    //7)设置日志等级味error
    ssc.sparkContext.setLogLevel("ERROR")
    //8)获取kafka数据源
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "hdp1:9092,hdp2:9092,hdp3:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "group1",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array("ods_score_topic")
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )
    val dataDS: DStream[(String, String, String, Int)] = stream.map(record => record.value())
      //9)解析json数据,返回4元组格式数据
      .map(JSON.parSEObject(_, classOf[Stu]))
      .map(stu => (
        stu.name, stu.sex, stu.kemu, stu.score
      ))
    dataDS.foreachRDD(rdd=>{
      //10)遍历元组计算每个学生的总成绩
      println("计算每个学生的总成绩")
      rdd.map(s => (s._1, s._4)).reduceByKey(_ + _).foreach(println)
      //11)遍历元组计算每个科目的最高分(10分)
      println("计算每个科目的最高分")
      rdd.groupBy(_._3).map(s=>(
        s._1,s._2.map(_._4).max
      )).foreach(println)
      //12)遍历元组计算每个科目的平均分(10分)
      println("计算每个科目的平均分")
      rdd.groupBy(_._3).map(s=>(
        s._1,s._2.map(_._4).sum * 1.0/s._2.size
      )).foreach(println)
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐