我开始在我的项目中使用spring-integration-kafka,我可以生成和使用来自Kafka的消息.但是现在,我希望向特定分区生成消息,并且还消耗来自特定分区的消息.
示例我想生成到分区3的消息,而消费只会从分区3接收消息.
到目前为止,我的主题有8个分区,我可以向特定分区生成消息,但我还没有找到配置消费者的方法,只接收来自特定分区的消息.
所以关于如何使用spring-integration-kafka配置使用者的任何建议,或者其他任何需要与KafkaConsumer.java类配合使用的建议都可以从特定分区接收消息.
谢谢.
这是我的代码:
kafka-producer-context.xml
figurations>
figuration
broker-list="127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094"
async="true" topic="testTopic"
key-class-type="java.lang.String"
key-encoder="encoder"
value-class-type="java.lang.String"
value-encoder="encoder"
partitioner="partitioner"
compression-codec="default" />
figurations>
Metadata.refresh.interval.ms">3600000required.acks">1
KafkaProducer.java
public class KafkaProducer {
private static final Logger logger = LoggerFactory
.getLogger(KafkaProducer.class);
@Autowired
private MessageChannel inputToKafka;
public void sendMessage(String message) {
try {
inputToKafka.send(MessageBuilder.withPayload(message)
.setHeader(KafkaHeaders.TOPIC,"testTopic")
.setHeader(KafkaHeaders.PARTITION_ID,3).build());
} catch (Exception e) {
logger.error(String.format(
"Failed to send [ %s ] to topic %s ",message,topic),e);
}
}
}
kafka-consumer-context.xml
dispatcher task-executor="kafkaMessageExecutor" />
fig.Propertiesfactorybean">
figurations>
figuration
group-id="defaultGrp" max-messages="20000">
figuration>
figurations>
KafkaConsumer.java
public class KafkaConsumer {
private static final Logger log = LoggerFactory
.getLogger(KafkaConsumer.class);
@Autowired
KafkaService kafkaService;
public void processMessage(Mapteratorterator = values.iterator(); iterator
.hasNext();) {
Listterator.next();
for (byte[] object : list) {
String message = new String(object);
log.debug("Message: " + message);
try {
kafkaService.receiveMessage(message);
} catch (Exception e) {
log.error(String.format("Failed to process message %s",message));
}
}
}
}
}
}
所以我的问题就在这里.当我向分区3或任何分区生成消息时,KafkaConsumer始终收到消息.我想要的只是:KafkaConsumer只接收来自分区3的消息,而不是来自其他分区的消息.
再次感谢.
As a variant,the KafkaMessageListenerContainer can accept org.springframework.integration.kafka.core.Partition array argument to specify topics and their partitions pair.
您需要使用this constructor连接侦听器容器,并使用listener-container属性将其提供给适配器.
我们将通过示例更新自述文件.
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。