微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spark2.x中如何实现SparkStreaming消费Kafka实例

这篇文章给大家分享的是有关Spark2.x中如何实现SparkStreaming消费Kafka实例的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。

软件软件:

       spark版本是apache spark2.2.0

       kafka版本是kafka0.10.0

    采用Direct Approach的方式来融合Spark Streaming和Kafka。没有采用Receiver-Based的方式。后续我会专门整理一篇文章分析两种融合方式不同。

1.kafka数据准备:

创建kafka的topic命令:

/usr/hdp/2.6.3.0-235/kafka/bin/kafka-topics.sh  --zookeeper  salver158.hadoop.unicom:2181,salver31.hadoop.unicom:2181,salver32.hadoop.unicom:2181  -topic kafkawordcount  -replication-factor 2 -partitions 2 -create

Spark2.x中如何实现SparkStreaming消费Kafka实例

发送数据命令:

/usr/hdp/2.6.3.0-235/kafka/bin/kafka-console-producer.sh --zookeeper  salver158.hadoop.unicom:2181,salver31.hadoop.unicom:2181,salver32.hadoop.unicom:2181  -topic kafkawordcount

Spark2.x中如何实现SparkStreaming消费Kafka实例

2.代码实例:

package com.unicom.ljs.spark220.study.streaming;
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.common.TopicPartition;import org.apache.spark.SparkConf;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.JavaInputDStream;import org.apache.spark.streaming.api.java.JavaPairDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import org.apache.spark.streaming.kafka010.ConsumerStrategies;import org.apache.spark.streaming.kafka010.KafkaUtils;import org.apache.spark.streaming.kafka010.LocationStrategies;import scala.Tuple2;
import java.util.*;
/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-01-31 20:30 * @version: v1.0 * @description: com.unicom.ljs.spark220.study.streaming */public class KafkaStreamingWordCount {    public static void main(String[] args) throws InterruptedException {
       SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("KafkaStreamingWordCount");
       JavaStreamingContext ssc=new JavaStreamingContext(sparkConf, Durations.seconds(5));
       String  topic="kafkawordcount";
       Collection<String> topics = new HashSet<>();        topics.add(topic);
       //kafka相关参数,其他参数可自行百度        String brokerList = "10.124.165.31:6667,10.124.165.32:6667";        Map<String, Object> props = new HashMap<>();        props.put("bootstrap.servers", brokerList);        props.put("group.id", "groupLjs1");        props.put("auto.offset.reset", "earliest");        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
       /*指定kafka中topic的消费分区*/        Map<TopicPartition, Long> offsets = new HashMap<>();        offsets.put(new TopicPartition(topic, 0), 0L);        offsets.put(new TopicPartition(topic, 1), 0L);
       //通过KafkaUtils.createDirectStream指定kafka数据源        // 三个参数   1 sparkcontext  2.LocationStrategies.PreferConsistent,如上所示。这将在可用执行程序之间均匀分配分区  3,订阅kafka 的配置
       JavaInputDStream<ConsumerRecord<Object, Object>> lines = KafkaUtils.createDirectStream(                ssc,                LocationStrategies.PreferConsistent(),                ConsumerStrategies.Subscribe(topics, props, offsets)        );        JavaPairDStream<String, Integer> counts =lines.flatMap(                x -> Arrays.asList(x.value().toString().split(" ")).iterator())                .mapToPair(x -> new Tuple2<String, Integer>(x, 1)).reduceByKey((x, y) -> x + y);
       /*打印结果*/        counts.print();
       /*启动*/        ssc.start();        ssc.awaitTermination();        /*停止*/        ssc.close();
   }}

3.数据统计展示:

Spark2.x中如何实现SparkStreaming消费Kafka实例

感谢各位的阅读!关于“Spark2.x中如何实现SparkStreaming消费Kafka实例”这篇文章分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐