微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

整合篇:零基础学习与使用ActiveMQ

配套资料,免费下载
链接:https://pan.baidu.com/s/1jA217UgqXpONi_fV-aOzqw
提取码:bm2g
复制这段内容后打开百度网盘手机App,操作更方便哦

1、ActiveMQ介绍

ActiveMQ

ActiveMQ是一种开源的基于JMS(Java Message Servie)规范的一种消息中间件的实现,ActiveMQ的设计目标是提供标准的,面向消息的,能够跨越多语言和多系统的应用集成消息通信中间件。

消息中间件

消息中间件就是利用高效可靠的消息传递机制进行与平台无关的数据交流并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者)。

常见的消息中间件产品:

  • ActiveMQ:它是Apache出品,最流行、能力强劲的开源消息总线。它是一个完全支持JMS 1.1和J2EE 1.4规范的JMS Provider实现。
  • RabbitMQ:AMQP协议的领导实现,支持多种场景。淘宝的MysqL集群内部有使用它进行通讯,OpenStack开源云平台的通信组件,最先在金融行业得到运用。
  • Kafka:Apache下的一个子项目 。高吞吐,在一台普通的服务器上也可以达到10W/s的吞吐速率,完全的分布式系统,适合处理海量数据。

JMS

JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数消息中间件提供商都对JMS提供支持

消息

消息是 JMS 中的一种类型对象,我们可以将消息分为几种类型,它们分别携带:简单文本(TextMessage)、可序列化的对象(ObjectMessage)、属性集合(MapMessage)、字节流(BytesMessage)、原始值流(StreamMessage)。

对于消息的传递有两种类型:

  • 一种是点对点的,即一个生产者和一个消费者一一对应。

image-20201222141927605

  • 一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。

image-20201222142133340

2、ActiveMQ安装

下载地址:https://activemq.apache.org/components/classic/download/

image-20210108101925125

下载解压:

image-20210108102102219

启动程序:

image-20210108102504240

访问地址:http://127.0.0.1:8161/,登录账户:admin,登录密码:admin

image-20210108102702362

image-20210108102824817

3、ActiveMQ集成

3.1、点对点

队列模式特点:

  1. 客户端包括生产者和消费者。
  2. 队列中的一个消息只能被一个消费者使用。
  3. 消费者可以随时取消息。

3.1.1、消息生产者

image-20210108103435945

image-20210108103514534

image-20210108103608953

image-20210108103636161

加入消息连接池:

<dependency>
    <groupId>org.messaginghub</groupId>
    <artifactId>pooled-jms</artifactId>
</dependency>

加入消息的配置:

spring:
  activemq:
    #activemq连接地址
    broker-url: tcp://127.0.0.1:61616
    #activemq登录账号
    user: admin
    #activemq登录密码
    password: admin
    #activemq的连接池
    pool:
      #开启消息连接池
      enabled: true
      #连接数量最大10
      max-connections: 10

编写消息生产者类:com.caochenlei.integrationactivemqproducer.ptp.PTPProducer

@Component
public class PTPProducer {
    @Autowired
    JmsMessagingTemplate jmsMessagingTemplate;

    /**
     * 向 ptp.hello 这个消息队列发送一个消息msg
     * @param msg
     */
    public void send(final String msg) {
        jmsMessagingTemplate.convertAndSend(new ActiveMQQueue("ptp.hello"), msg);
    }
}

测试消息生产者类:com.caochenlei.integrationactivemqproducer.IntegrationActivemqProducerApplicationTests

@SpringBoottest
class IntegrationActivemqProducerApplicationTests {
    @Autowired
    private PTPProducer ptpProducer;

    @Test
    void testSendPTP() {
        ptpProducer.send("张三");
    }
}

查看网页消息队列:http://127.0.0.1:8161/admin/queues.jsp

image-20210108123552887

3.1.2、消息消费者

image-20210108124009214

image-20210108124033007

image-20210108124157432

image-20210108124111618

image-20210108124231826

加入消息连接池:

<dependency>
    <groupId>org.messaginghub</groupId>
    <artifactId>pooled-jms</artifactId>
</dependency>

加入消息的配置:

spring:
  activemq:
    #activemq连接地址
    broker-url: tcp://127.0.0.1:61616
    #activemq登录账号
    user: admin
    #activemq登录密码
    password: admin
    #activemq的连接池
    pool:
      #开启消息连接池
      enabled: true
      #连接数量最大10
      max-connections: 10

编写消费者配置类:com.caochenlei.integrationactivemqconsumer.config.JmsConfig

@Configuration
public class JmsConfig {
    // queue模式的ListenerContainer
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setPubSubDomain(false);
        bean.setConnectionFactory(activeMQConnectionFactory);
        return bean;
    }

    // topic模式的ListenerContainer
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setPubSubDomain(true);
        bean.setConnectionFactory(activeMQConnectionFactory);
        return bean;
    }
}

编写消息消费者类:com.caochenlei.integrationactivemqconsumer.ptp.PTPConsumer

@Component
public class PTPConsumer {
    @JmsListener(destination = "ptp.hello", containerFactory = "jmsListenerContainerQueue")
    public void receive(String msg) {
        System.out.println("接收到的消息:" + msg);
    }
}

测试消息消费者类:

直接启动项目,他就会自己监听指定队列中是否有数据,如果有,则取出。

查看网页消息队列:http://127.0.0.1:8161/admin/queues.jsp

image-20210108125009784

查看当前控制台:

image-20210108125043736

3.2、发布订阅

主题模式特点:

  1. 客户端包括发布者(生产者)和订阅者(消费者)。
  2. 主题中的消息可以被所有订阅者消费。
  3. 消费者不能消费订阅之前发送的消息。

3.2.1、消息生产者

切换到指定的项目:integration-activemq-producer

编写消息生产者类:com.caochenlei.integrationactivemqproducer.ps.PSProducer

@Component
public class PSProducer {
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    /**
     * 向 ps.hello 这个主题发送一个消息msg
     * @param msg
     */
    public void send(final String msg) {
        jmsMessagingTemplate.convertAndSend(new ActiveMQTopic("ps.hello"), msg);
    }
}

测试消息生产者类:com.caochenlei.integrationactivemqproducer.IntegrationActivemqProducerApplicationTests

@SpringBoottest
class IntegrationActivemqProducerApplicationTests {
    @Autowired
    private PSProducer psProducer;

    @Test
    void testSendPS() {
        psProducer.send("李四");
    }
}

查看网页发布订阅:http://127.0.0.1:8161/admin/topics.jsp

image-20210108135311719

3.2.2、消息消费者

切换到指定的项目:integration-activemq-consumer

编写消息消费者类:

@Component
public class PSConsumer {
    @JmsListener(destination = "ps.hello",containerFactory = "jmsListenerContainerTopic")
    public void receive(String msg) {
        System.out.println("接收到的消息:" + msg);
    }
}

测试消息消费者类:

直接启动项目,他就会自己监听指定主题,当生产者向这个主题中发送消息,他才可以接收,意思就是,先启动消费者,然后在启动生产者,消费者才能消费。

查看网页发布订阅:http://127.0.0.1:8161/admin/topics.jsp

image-20210108135535801

查看当前控制台:

image-20210108134028283

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐