基于消费者 reject requeue设置为false 消息进入死信队列
# 应用名称 spring.application.name=rabbitmq # 应用服务 WEB 访问端口 server.port=8080 spring.rabbitmq.host=192.168.1.137 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ #手动ack spring.rabbitmq.listener.simple.ackNowledge-mode=manual #每次去交换机拿10条消息 spring.rabbitmq.listener.simple.prefetch=10 #开启confirm机制 spring.rabbitmq.publisher-confirm-type=correlated #开启return机制 spring.rabbitmq.publisher-returns=true
@Configuration public class DeadLetterConfig { public static final String norMAL_EXCHANGE="normal-exchange"; public static final String norMAL_QUEUE="normal-queue"; public static final String norMAL_ROUTING_KEY="normal.#"; public static final String DEAD_EXCHANGE="dead-exchange"; public static final String DEAD_QUEUE="dead-queue"; public static final String DEAD_ROUTING_KEY="dead.#"; @Bean public Exchange normalExchange(){ return ExchangeBuilder.topicExchange(norMAL_EXCHANGE).build(); } /*这里的队列需要绑定死信交换机*/ @Bean public Queue normalQueue(){ return QueueBuilder.durable(norMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey(DEAD_ROUTING_KEY).build(); } @Bean public Binding normalBinding(Exchange normalExchange,Queue normalQueue){ return BindingBuilder.bind(normalQueue).to(normalExchange).with(norMAL_ROUTING_KEY).noargs(); } @Bean public Exchange deadExchange(){ return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).build(); } @Bean public Queue deadQueue(){ return QueueBuilder.durable(DEAD_QUEUE).build(); } @Bean public Binding deadBinding(Exchange deadExchange,Queue deadQueue){ return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs(); } }
@Component public class DeadListener { @RabbitListener(queues = DeadLetterConfig.norMAL_QUEUE) public void consumer(String msg, Channel channel, Message message) throws IOException { System.out.println("接收到normal队列消息"+msg); /*拒绝消息 消息进入死信队列*/ channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } }
/*死信发送者*/ @Test public void publish() throws IOException { String msg="dead letter"; rabbitTemplate.convertAndSend(DeadLetterConfig.norMAL_EXCHANGE,"normal.abc",msg); system.in.read(); }
--------
基于消息的生存时间 让消息进入到死信队列
/*死信发送者*/
@Test public void publishExpire() throws IOException { String msg="dead letter Expire"; rabbitTemplate.convertAndSend(DeadLetterConfig.norMAL_EXCHANGE, "normal.abc", msg, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration("50000");//5秒钟之内没有将消息消费掉 那么消息将进入死信队列中 return message; } }); system.in.read(); }
最好不要设置 这种方式消息的生存时间 因为rabbitMQ 只会监听最外围的生存时间 也就是说 当msg1 ttl 5s,msg2 ttl 10s
如果msg2第一个到达队列,msg1第二个到达 rabbitMQ会先监听msg2的10s 然后再来查看msg1的5秒 此时msg需要10秒 然后进入死信队列中 不会被正常消费
为了解决上述问题 我们需要引入 延迟交换机来解决此类问题 插件下载地址
https://www.rabbitmq.com/community-plugins.html
下载包 支持3.8.5 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.8.9/rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez
docker cp rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez rabbitmq:/opt/rabbitmq/plugins
docker exec -it rabbitmq bash
cd /opt/rabbitmq/plugins
cd ../sbin/
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
exit
docker restart rabbitmq
这种方式是基于 生产者发送消息 到发送到交换机 消息会堆积在交换机 到达了指定时间 才会路由到指定的队列中 通过这种方式我们可以更方便的实现延迟消费 解决上面的问题
@Test public void DelayedPublish(){ rabbitTemplate.convertAndSend(XDelayedMessageConfig.DelayedMessage_EXCHANGE,"delayed.ach","xxx",new MessagePostProcessor(){ @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setDelay(3000); return message; } }); }
@Configuration public class XDelayedMessageConfig { public static final String DelayedMessage_EXCHANGE="DelayedMessage-exchange"; public static final String DelayedMessage_QUEUE="DelayedMessage-queue"; public static final String DelayedMessage_ROUTING_KEY="delayed.#"; /*延迟交换机*/ @Bean public Exchange delayedExchange(){ Map<String, Object> arguments=new HashMap<>(); arguments.put("x-delayed-type", "topic"); Exchange exchange = new CustomExchange(DelayedMessage_EXCHANGE, "x-delayed-message",true,false,arguments); return exchange; } @Bean public Queue delayedQueue(){ return QueueBuilder.durable(DelayedMessage_QUEUE).build(); } @Bean public Binding delayedBinding(Exchange delayedExchange, Queue delayedQueue){ return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DelayedMessage_ROUTING_KEY).noargs(); } }
延迟交换机
------------
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。