微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

spring boot 消息

消息

Java处理消息的标准规范

​ 目前企业级开发中广泛使用的消息处理技术共三大类,具体如下:

  • JMS
  • AMQP
  • MQTT

​ 为什么是三大类,而不是三个技术呢?因为这些都是规范,就想JDBC技术,是个规范,开发针对规范开发,运行还要靠实现类,例如MysqL提供了JDBC的实现,最终运行靠的还是实现。并且这三类规范都是针对异步消息进行处理的,也符合消息的设计本质,处理异步的业务。对以上三种消息规范做一下普及

JMS

​ JMS(Java Message Service),这是一个规范,作用等同于JDBC规范,提供了与消息服务相关的API接口。

JMS消息模型

​ JMS规范中规范了消息有两种模型。分别是点对点模型发布订阅模型

点对点模型:peer-2-peer,生产者会将消息发送到一个保存消息的容器中,通常使用队列模型,使用队列保存消息。一个队列的消息只能被一个消费者消费,或未被及时消费导致超时。这种模型下,生产者和消费者是一对一绑定的。

发布订阅模型:publish-subscribe,生产者将消息发送到一个保存消息的容器中,也是使用队列模型来保存。但是消息可以被多个消费者消费,生产者和消费者完全独立,相互不需要感知对方的存在。

​ 以上这种分类是从消息的生产和消费过程来进行区分,针对消息所包含的信息不同,还可以进行不同类别的划分。

JMS消息种类

​ 根据消息中包含的数据种类划分,可以将消息划分成6种消息。

  • TextMessage
  • MapMessage
  • BytesMessage
  • StreamMessage
  • ObjectMessage
  • Message (只有消息头和属性

​ JMS主张不同种类的消息,消费方式不同,可以根据使用需要选择不同种类的消息。但是这一点也成为其诟病之处,后面再说。整体上来说,JMS就是典型的保守派,什么都按照J2EE的规范来,做一套规范,定义若干个标准,每个标准下又提供一大批API。目前对JMS规范实现的消息中间件技术还是挺多的,例如ActiveMQ、Redis、HornetMQ。但是也有一些不太规范的实现,参考JMS的标准设计,但是又不完全满足其规范,例如:RabbitMQ、RocketMQ。

AMQP

AMQP(advanced message queuing protocol):一种协议(高级消息队列协议,也是消息代理规范),规范了网络交换的数据格式,兼容JMS操作。
优点

​ 具有跨平台性,服务器供应商,生产者,消费者可以使用不同的语言来实现

JMS消息种类

​ AMQP消息种类:byte[]

​ AMQP在JMS的消息模型基础上又进行了进一步的扩展,除了点对点和发布订阅的模型,开发了几种全新的消息模型,适应各种各样的消息发送。

AMQP消息模型

  • direct exchange
  • fanout exchange
  • topic exchange
  • headers exchange
  • system exchange

​ 目前实现了AMQP协议的消息中间件技术也很多,而且都是较为流行的技术,例如:RabbitMQ、StormMQ、RocketMQ

MQTT

​ MQTT(Message queueing Telemetry Transport)消息队列遥测传输,专为小设备设计,是物联网(IOT)生态系统中主要成分之一。

​ 除了上述3种J2EE企业级应用中广泛使用的三种异步消息传递技术,还有一种技术也不能忽略,Kafka。

KafKa

​ Kafka,一种高吞吐量的分布式发布订阅消息系统,提供实时消息功能。Kafka技术并不是作为消息中间件为主要功能的产品,但是其拥有发布订阅的工作模式,也可以充当消息中间件来使用,而且目前企业级开发中其身影也不少见。

SpringBoot整合ActiveMQ

启动服务器

activemq.bat

​ 运行bin目录下的win32或win64目录下的activemq.bat命令即可,根据自己的操作系统选择即可,认对外服务端口61616。

访问web管理服务

​ ActiveMQ启动后会启动一个Web控制台服务,可以通过该服务管理ActiveMQ。

http://127.0.0.1:8161/

​ web管理服务认端口8161,访问后可以打开ActiveMQ的管理界面,初始化用户名密码相同,均为:admin

整合

​ 做了这么多springboot整合第三方技术,已经摸到门路了,加坐标,做配置,调接口,直接开工

步骤①:导入springboot整合ActiveMQ的starter

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

步骤②:配置ActiveMQ的服务器地址

spring:
  activemq:
    broker-url: tcp://localhost:61616

步骤③:使用JmsMessagingTemplate操作ActiveMQ

@Service
public class MessageServiceActivemqImpl implements MessageService {
    @Autowired
    private JmsMessagingTemplate messagingTemplate;

    @Override
    public void sendMessage(String id) {
        System.out.println("待发送短信的订单已纳入处理队列,id:"+id);
        messagingTemplate.convertAndSend("order.queue.id",id);
    }

    @Override
    public String doMessage() {
        String id = messagingTemplate.receiveAndConvert("order.queue.id",String.class);
        System.out.println("已完成短信发送业务,id:"+id);
        return id;
    }
}

​ 发送消息需要先将消息的类型转换成字符串,然后再发送,所以是convertAndSend,定义消息发送的位置,和具体的消息内容,此处使用id作为消息内容

​ 接收消息需要先将消息接收到,然后再转换成指定的数据类型,所以是receiveAndConvert,接收消息除了提供读取的位置,还要给出转换后的数据的具体类型。

步骤④:使用消息监听器在服务器启动后,监听指定位置,当消息出现后,立即消费消息

@Component
public class MessageListener {
    @JmsListener(destination = "order.queue.id")
    @SendTo("order.other.queue.id")
    public String receive(String id){
        System.out.println("已完成短信发送业务,id:"+id);
        return "new:"+id;
    }
}

​ 使用注解@JmsListener定义当前方使用注解@JmsListener定义当前方法监听ActiveMQ中指定名称的消息队列。

​ 如果当前消息队列处理完还需要继续向下传递当前消息到另一个队列中使用注解@SendTo即可,这样即可构造连续执行的顺序消息队列。

步骤⑤:切换消息模型由点对点模型到发布订阅模型,修改jms配置即可

spring:
  activemq:
    broker-url: tcp://localhost:61616
  jms:
    pub-sub-domain: true

​ pub-sub-domain认值为false,即点对点模型,修改为true后就是发布订阅模型。

总结

  1. springboot整合ActiveMQ提供了JmsMessagingTemplate对象作为客户端操作消息队列
  2. 操作ActiveMQ需要配置ActiveMQ服务器地址,认端口61616
  3. 企业开发时通常使用监听器来处理消息队列中的消息,设置监听器使用注解@JmsListener
  4. 配置jms的pub-sub-domain属性可以在点对点模型和发布订阅模型间切换消息模型

SpringBoot整合RabbitMQ

​ RabbitMQ是MQ产品中的目前较为流行的产品之一,它遵从AMQP协议。RabbitMQ的底层实现语言使用的是Erlang,所以安装RabbitMQ需要先安装Erlang。

Erlang安装

​ windows版安装包下载地址:https

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐