微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

spring-integration-kafka配置使用者从指定分区接收消息

我开始在我的项目中使用spring-integration-kafka,我可以生成和使用来自Kafka的消息.但是现在,我希望向特定分区生成消息,并且还消耗来自特定分区的消息.

示例我想生成到分区3的消息,而消费只会从分区3接收消息.

到目前为止,我的主题有8个分区,我可以向特定分区生成消息,但我还没有找到配置消费者的方法,只接收来自特定分区的消息.

所以关于如何使用spring-integration-kafka配置使用者的任何建议,或者其他任何需要与KafkaConsumer.java类配合使用的建议都可以从特定分区接收消息.

谢谢.

这是我的代码

kafka-producer-context.xml

figurations>
        figuration 
            broker-list="127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094"
            async="true" topic="testTopic"
            key-class-type="java.lang.String" 
            key-encoder="encoder"
            value-class-type="java.lang.String" 
            value-encoder="encoder"
            partitioner="partitioner"
            compression-codec="default" />
    figurations>
Metadata.refresh.interval.ms">3600000required.acks">1

KafkaProducer.java

public class KafkaProducer {

private static final Logger logger = LoggerFactory
        .getLogger(KafkaProducer.class);

@Autowired
private MessageChannel inputToKafka;

public void sendMessage(String message) {

    try {
        inputToKafka.send(MessageBuilder.withPayload(message)
                    .setHeader(KafkaHeaders.TOPIC,"testTopic")
                    .setHeader(KafkaHeaders.PARTITION_ID,3).build());
    } catch (Exception e) {
        logger.error(String.format(
                "Failed to send [ %s ] to topic %s ",message,topic),e);
    }
}

}

kafka-consumer-context.xml

dispatcher task-executor="kafkaMessageExecutor" />
fig.Propertiesfactorybean">
    figurations>
        figuration
            group-id="defaultGrp" max-messages="20000">
            figuration>
    figurations>

KafkaConsumer.java

public class KafkaConsumer {

private static final Logger log = LoggerFactory
        .getLogger(KafkaConsumer.class);

@Autowired
KafkaService kafkaService;

public void processMessage(Mapteratorterator = values.iterator(); iterator
                .hasNext();) {
            Listterator.next();
            for (byte[] object : list) {
                String message = new String(object);
                log.debug("Message: " + message);
                try {
                    kafkaService.receiveMessage(message);
                } catch (Exception e) {
                    log.error(String.format("Failed to process message %s",message));
                }
            }
        }

    }
}
}

所以我的问题就在这里.当我向分区3或任何分区生成消息时,KafkaConsumer始终收到消息.我想要的只是:KafkaConsumer只接收来自分区3的消息,而不是来自其他分区的消息.

再次感谢.

最佳答案
你需要使用message-driven-channel-adapter.

As a variant,the KafkaMessageListenerContainer can accept org.springframework.integration.kafka.core.Partition array argument to specify topics and their partitions pair.

您需要使用this constructor连接侦听器容器,并使用listener-container属性将其提供给适配器.

我们将通过示例更新自述文件.

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐