微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

spark集成kafka数据源

1、spark集成的KafkaUtils.createStream已经过期,这个是Spark Integration For Kafka 0.8里集成的。

      替代的是Spark Integration For Kafka 0.10,已经没有createStream函数,采用createDirectStream,

    区别是直连kafka服务器,而不是连接zookeeper。

2、依赖

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.4.4</version>
        </dependency>

3、code

package com.home.spark.streaming

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Ex_kafkaSource {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf(true).setMaster("local[*]").setAppName("spark streaming wordcount")
    
    conf.set("spark.streaming.stopGracefullyOnShutdown", "true")

    //环境对象,设置采集周期
    val scc: StreamingContext = new StreamingContext(conf, Seconds(30))

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "192.168.44.10:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (true: java.lang.Boolean)
    )

    val topics = Array("test")

    val kafkaStream = KafkaUtils.createDirectStream[String, String](
      scc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String,String](
        topics,
        kafkaParams
      )
    )

    kafkaStream.foreachRDD(rdd => {
      val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      val maped: RDD[(String, String)] = rdd.map(record => (record.key,record.value))
      //计算逻辑
      maped.foreach(println)
      //循环输出
      for(o <- offsetRange){
        println(s"${o.topic}  ${o.partition} ${o.fromOffset} ${o.untilOffset}")
      }
    })

    val words: DStream[String] = kafkaStream.flatMap(t=>t.value().split(" "))

//    val words: DStream[String] = socketStream.flatMap(_.split(" "))

    val pairs = words.map(word => (word, 1))

    val wordCounts: DStream[(String, Int)] = pairs.reduceByKey(_ + _)

    // Print the first ten elements of each RDD generated in this DStream to the console
    wordCounts.print

    // Start the computation
    // 通过 streamingContext.start()来启动消息采集和处理
    scc.start()

    // Wait for the computation to terminate
    // 通过streamingContext.stop()来手动终止处理程序
    scc.awaitTermination()
  }
}

4、kafka测试环境

     https://www.cnblogs.com/asker009/p/9958240.html

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐