微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

RabbitMQ 在Spring Boot上的使用

 

1.pom

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>

<!-- rabbitmq-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
View Code

2.application.yml

server:
  port: 82
  max-http-header-size: 10240

spring:
  profiles:
    active: local
  rabbitmq:
    host: 192.168.239.137
    username: guest
    password: guest
View Code

3.RabbitMqConfig

package com.tf.demo.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitmqConfig {

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    //default
    @Bean
    protected Queue defaultQueue(){
        return QueueBuilder.durable("defaultqueue").build();
    }

    //direct
    @Bean
    protected Queue directQueue(){
        return QueueBuilder.durable("directqueue").build();
    }

    @Bean
    protected DirectExchange directExchange() {
        return new DirectExchange("amq.direct");
    }

    @Bean
    protected Binding directBinding(Queue directQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue).to(directExchange).with("directroutingKey");
    }


    //fanout  : 广播 。  routingKey对于fanout没有意义的
    @Bean
    protected Queue fanoutQueue(){
        return QueueBuilder.durable("fanoutqueue").build();
    }

    @Bean
    protected FanoutExchange fanoutExchange() {
        return new FanoutExchange("amq.fanout");
    }

    @Bean
    protected Binding fanoutBinding(Queue fanoutQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
    }


    //topic
    @Bean
    protected Queue topicQueue(){
        return QueueBuilder.durable("topicqueue").build();
    }

    @Bean
    protected TopicExchange topicExchange() {
        return new TopicExchange("amq.topic");
    }

    @Bean
    protected Binding topicBinding(Queue topicQueue, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueue).to(topicExchange).with("com.topic.*"); //*代表一个单词,#代表一个或多个
    }

    @Bean
    protected Binding topicBinding2(Queue topicQueue, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueue).to(topicExchange).with("com.topic.#"); //*代表一个单词,#代表一个或多个
    }

}
View Code

4.Publisher

package com.tf.demo.test;

import com.tf.demo.DemoApplication;
import com.tf.demo.service.rabbitmq.message.HelloMsg;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBoottest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

/**
 * Publisher 与 exchange、routingKey 有关系
 * exchange 与 routingKey 找到 queue
 * Consumer 与 queue 有关系
 *
 */
@SpringBoottest(classes = DemoApplication.class)
@RunWith(SpringJUnit4ClassRunner.class)
public class Publisher {

    @Autowired
    private AmqpTemplate amqpTemplate;

    //认交换器: direct : 公平调度
    @Test
    public void testDefault(){
        var routingKey = "defaultqueue";
        var message = HelloMsg.builder().id(1L).name("hello default").build();
        amqpTemplate.convertAndSend(routingKey, message);
        System.out.println("发送成功");
    }

    @Test
    public void testDirect(){
        var exchange = "amq.direct";
        var routingKey = "directroutingKey";
        var message = HelloMsg.builder().id(1L).name("hello direct").build();
        amqpTemplate.convertAndSend(exchange, routingKey, message);
        System.out.println("发送成功");
    }

    @Test
    public void testFanout(){
        var exchange = "amq.fanout";
        var message = HelloMsg.builder().id(1L).name("hello fanout").build();
        amqpTemplate.convertAndSend(exchange, "",  message); //routingKey 随便写
        System.out.println("发送成功");
    }

    @Test
    public void testTopic(){
        var exchange = "amq.topic";
        var routingKey = "com.topic.a";
        var message = HelloMsg.builder().id(1L).name("hello topic").build();
        amqpTemplate.convertAndSend(exchange, routingKey, message);
        System.out.println("发送成功");
    }
}
View Code

5.Consumer

package com.tf.demo.service.rabbitmq;


import com.alibaba.fastjson.JSON;
import com.tf.demo.service.rabbitmq.message.HelloMsg;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class Consumer {

    @RabbitListener(queues = "defaultqueue")
    public void demo1(HelloMsg msg){
        System.out.println("获取到的消息:"+ JSON.toJSONString(msg));
    }

    @RabbitListener(queues = "directqueue")
    public void demo2(HelloMsg msg){
        System.out.println("获取到的消息:"+msg);
    }

    @RabbitListener(queues = "fanoutqueue")
    public void demo3(HelloMsg msg){
        System.out.println("获取到的消息:"+msg);
    }

    @RabbitListener(queues = "topicqueue")
    public void demo4(HelloMsg msg){
        System.out.println("获取到的消息:"+msg);
    }

}
View Code

 

@RabbitListener
@RabbitHandler
 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐