题目一:以下为学生期末考试的部分数据,请按要求完成统计,格式如下@H_404_12@{"name":"zhangsan","sex":"m",”kemu”:”yuwen”,"score":66}
1) 创建kafka主题ods_score_topic,要求一个备份,一个分区@H_404_12@
2) 创建生产者,往主题里添加15条以上数据@H_404_12@
3) 创建maven项目@H_404_12@
4) 导入sparkstreaming依赖@H_404_12@
5) 创建sparkconf环境@H_404_12@
6) 设置批时间5S@H_404_12@
7) 设置日志等级味error@H_404_12@
9) 解析json数据,返回4元组格式数据@H_404_12@
生产者
package com.lq.scala import com.alibaba.fastjson.JSON import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerRecord import java.util.Properties import scala.beans.BeanProperty @H_404_12@// @BeanProperty 自动生成set get
@H_404_12@ case class Stu(@BeanProperty name:String,@BeanProperty sex:String,@BeanProperty kemu:String,@BeanProperty score:Int) object KafkaProducerTest { def main(args: Array[String]): Unit = { val props = new Properties() props.put("bootstrap.servers", "hdp1:9092,hdp2:9092,hdp3:9092") props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") val producer: KafkaProducer[String,String] = new KafkaProducer[String,String](props) //2)创建生产者,往主题里添加15条以上数据 producer.send(new ProducerRecord[String, String]("ods_score_topic","",JSON.toJSON(Stu("张三", "m", "yuwen", 66)).toString)) producer.send(new ProducerRecord[String, String]("ods_score_topic","",JSON.toJSON(new Stu("张三","m","yingyu",77)).toString)) producer.send(new ProducerRecord[String, String]("ods_score_topic","",JSON.toJSON(new Stu("张三","m","shuxue",88)).toString)) producer.send(new ProducerRecord[String, String]("ods_score_topic","",JSON.toJSON(new Stu("李四","m","yuwen",80)).toString)) producer.send(new ProducerRecord[String, String]("ods_score_topic","",JSON.toJSON(new Stu("李四","m","yingyu",90)).toString)) producer.send(new ProducerRecord[String, String]("ods_score_topic","",JSON.toJSON(new Stu("李四","m","shuxue",100)).toString)) producer.send(new ProducerRecord[String, String]("ods_score_topic","",JSON.toJSON(new Stu("王五","n","yuwen",100)).toString)) producer.send(new ProducerRecord[String, String]("ods_score_topic","",JSON.toJSON(new Stu("王五","n","yingyu",80)).toString)) producer.send(new ProducerRecord[String, String]("ods_score_topic","",JSON.toJSON(new Stu("王五","n","shuxue",70)).toString)) producer.send(new ProducerRecord[String, String]("ods_score_topic","",JSON.toJSON(new Stu("赵六","n","yuwen",80)).toString)) producer.send(new ProducerRecord[String, String]("ods_score_topic","",JSON.toJSON(new Stu("赵六","n","yingyu",80)).toString)) producer.send(new ProducerRecord[String, String]("ods_score_topic","",JSON.toJSON(new Stu("赵六","n","shuxue",80)).toString)) producer.send(new ProducerRecord[String, String]("ods_score_topic","",JSON.toJSON(new Stu("郑七","m","yuwen",95)).toString)) producer.send(new ProducerRecord[String, String]("ods_score_topic","",JSON.toJSON(new Stu("郑七","m","yingyu",85)).toString)) producer.send(new ProducerRecord[String, String]("ods_score_topic","",JSON.toJSON(new Stu("郑七","m","shuxue",75)).toString)) producer.close() } }
消费者
package com.lq.scala import com.alibaba.fastjson.JSON import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe object KafkaConsumerTest { def main(args: Array[String]): Unit = { //5)创建sparkconf环境 setMaster 设置线程数量 val conf = new SparkConf().setMaster("local[*]").setAppName("week1") //6)设置批时间5S val ssc = new StreamingContext(conf, Seconds(5)) //7)设置日志等级味error ssc.sparkContext.setLogLevel("ERROR") //8)获取kafka数据源 val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "hdp1:9092,hdp2:9092,hdp3:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "group1", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("ods_score_topic") val stream = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) val dataDS: DStream[(String, String, String, Int)] = stream.map(record => record.value()) //9)解析json数据,返回4元组格式数据 .map(JSON.parSEObject(_, classOf[Stu])) .map(stu => ( stu.name, stu.sex, stu.kemu, stu.score )) dataDS.foreachRDD(rdd=>{ //10)遍历元组计算每个学生的总成绩 println("计算每个学生的总成绩") rdd.map(s => (s._1, s._4)).reduceByKey(_ + _).foreach(println) //11)遍历元组计算每个科目的最高分(10分) println("计算每个科目的最高分") rdd.groupBy(_._3).map(s=>( s._1,s._2.map(_._4).max )).foreach(println) //12)遍历元组计算每个科目的平均分(10分) println("计算每个科目的平均分") rdd.groupBy(_._3).map(s=>( s._1,s._2.map(_._4).sum * 1.0/s._2.size )).foreach(println) }) ssc.start() ssc.awaitTermination() } }
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。