微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

解决spark streaming集成kafka时只能读topic的其中一个分区数据的问题

1. 问题描述

我创建了一个名称myTest的topic,该topic有三个分区,在我的应用中spark streaming以direct方式连接kakfa,但是发现只能消费一个分区的数据,多次更换comsumer group依然如此。

2 环境配置

kafka集群环境,

@H_404_55@

应用依赖:spark版本是2.1.1、kakfa版本是0.10.1.1;
maven依赖配置如下

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.1.1</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>$2.1.1</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.10.1.1</version>
</dependency>

相关配置代码(Java)如下:

Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONfig, "192.168.1.101:9092,192.168.1.102:9092,192.168.1.103:9092");
kafkaParams.put(ConsumerConfig.GROUP_ID_CONfig, "myGroup");
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONfig, "earliest");
kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONfig, "true");
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONfig, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONfig, StringDeserializer.class);

Set<String> topics = new HashSet<String>(Arrays.asList("testTopic"));
JavaInputDStream<ConsumerRecord<Object, Object>> dStream = KafkaUtils.createDirectStream(
    jssc,
    LocationStrategies.PreferConsistent(),
    ConsumerStrategies.Subscribe(topics, kafkaParams));

3. 解决方

经过查阅相关资料发现是由于Kafka 0.10.1.1的bug导致的。其实不仅仅是0.10.1.1,另外0.10.1.0和0.10.0.2也有这个问题。详细描述参考https://issues.apache.org/jira/browse/KAFKA-4547
最后我将kafka版本降到了0.10.0.1,解决了这个问题。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.10.0.1</version>
</dependency>

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐


主机 IP 操作系统 kakfa
node1 192.168.1.101 Centos 6.5 kafka_2.11-0.10.1.1
node2 192.168.1.102 Centos 6.5 kafka_2.11-0.10.1.1
node3 192.168.1.103 Centos 6.5 kafka_2.11-0.10.1.1