这篇文章主要介绍了springboot项目配置多个kafka的示例代码的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇springboot项目配置多个kafka的示例代码文章都会有所收获,下面我们一起来看看吧。
1.spring-kafka
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.5.RELEASE</version> </dependency>
2.配置文件相关信息
kafka.bootstrap-servers=localhost:9092 kafka.consumer.group.id=20230321 #可以并发消费的线程数 (通常与partition数量一致) kafka.consumer.concurrency=10 kafka.consumer.enable.auto.commit=false kafka.bootstrap-servers.pic=localhost:29092 kafka.consumer.group.id.pic=20230322_pic kafka.consumer.concurrency.pic=10 kafka.consumer.enable.auto.commit.pic=false
3.kafka配置类
@Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${kafka.consumer.group.id}") private String groupId; @Value("${kafka.consumer.concurrency}") private int concurrency; @Value("${kafka.consumer.enable.auto.commit}") private String autoCommit; @Value("${kafka.bootstrap-servers}") private String bootstrapServer; @Value("${kafka.consumer.group.id.pic}") private String groupIdPic; @Value("${kafka.consumer.concurrency.pic}") private int concurrencyPic; @Value("${kafka.consumer.enable.auto.commit.pic}") private String autoCommitPic; @Value("${kafka.bootstrap-servers.pic}") private String bootstrapServerPic; @Bean public ConsumerFactory<String, String> consumerFactory() { String bootstrapServers = bootstrapServer; Map<String, Object> configProps = new HashMap<>(16); configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONfig, bootstrapServers); configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONfig, StringDeserializer.class); configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONfig, StringDeserializer.class); configProps.put(ConsumerConfig.GROUP_ID_CONfig, groupId); configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONfig, autoCommit); return new DefaultKafkaConsumerFactory<>(configProps); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(concurrency); factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE); return factory; } @Bean public ConsumerFactory<String, String> consumerFactoryPic() { String bootstrapServers = bootstrapServerPic; Map<String, Object> configProps = new HashMap<>(16); configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONfig, bootstrapServers); configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONfig, StringDeserializer.class); configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONfig, StringDeserializer.class); configProps.put(ConsumerConfig.GROUP_ID_CONfig, groupIdPic); configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONfig, autoCommitPic); return new DefaultKafkaConsumerFactory<>(configProps); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryPic() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactoryPic()); factory.setConcurrency(concurrencyPic); factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE); return factory; } }
4.消费主题消息
@KafkaListener(topics = "xxxxx", containerFactory = "kafkaListenerContainerFactoryPic") public void receive(ConsumerRecord<String, String> message, AckNowledgment ack) { try { String jsonString = message.value(); if (StringUtils.isNoneBlank(jsonString)) { log.info("消费:{}",jsonString); //Todo .... } } catch (Exception e) { log.error(" receive topic error ", e); } finally { ack.ackNowledge(); } } @KafkaListener(topics = "xxxxxx", containerFactory = "kafkaListenerContainerFactory") public void receive(ConsumerRecord<String, String> message, AckNowledgment ack) { try { if (StringUtils.isNoneBlank(message.value())) { //Todo .... } } catch (Exception e) { logger.error(" receive topic error ", e); } finally { ack.ackNowledge(); } }
关于“springboot项目配置多个kafka的示例代码”这篇文章的内容就介绍到这里,感谢各位的阅读!相信大家对“springboot项目配置多个kafka的示例代码”知识都有一定的了解,大家如果还想学习更多知识,欢迎关注编程之家行业资讯频道。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。