微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

MQ发布预定心得

发布预定模式在MQ中提供了两种实现方式,在我原来的测试中每个预订者都是非持久的,也就是说只有当预订者与MQ保持连接时才能获得消息,而当连接断开时预订者不会接受到消息。如果以线程的方式来实现时刻保持连接的话,是可以的,但是现在采用的技术是以WebService服务来提供,对方发出一个请求,我返回一个请求的形式的话,使用线程时刻监听的这种方式能否可行是一个未知的问题。因此在查阅资料后,得知JMS的实现中有持久订阅这个模型后,问题有了初步的解决方向。

持久订阅的意思如其名,也就是说在预订者与MQ断开连接时,所有发布到预定主题的消息并不会丢失,当预定者与MQ再次连接后,可以接收所有断开连接时的消息。具体实现机制是用一个唯一的名称来标示预订者,在预订者断开连接后,MQ仍然会把所有发布的消息放到以这个唯一的名称来标示的地方存储,在预订者想要再次连接时,只要给出这个唯一的名称,就可以恢复原来的连接并取到所有在连接断开时发布的消息。具体实现代码如下:

       public synchronized void openDurable() {

              try{              

                     factoryFactory =JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);

                     connectionFactory =factoryFactory.createConnectionFactory();

             

                     connectionFactory.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE,WMQConstants.WMQ_CM_BINDINGS);

                     connectionFactory.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER,MQ_QUEUEMANAGER_NAME);

                    

                     connection =connectionFactory.createConnection();

                    

                     connection.setClientID(clientID);

                     boolean transacted=false;

                     session =connection.createSession(transacted,Session.AUTO_ACKNowLEDGE);

 

                     String subject = topicName;

                     topic =session.createtopic(subject);

 

                     consumer =session.createDurableSubscriber(topic,subName);

                     connection.start();               

                     isOpen = true;              

              } catch (JMSException e) {

                     e.printstacktrace();

              }

       }

注意其中颜色标注的两部分,第一句话connection.setClientID(clientID);给连接一个clientID的名称,此内容是持久预定唯一标示的一部分。第二句话consumer = session.createDurableSubscriber(topic,subName);使用Session生成一个持久预定者,此方法由两个参数,第一个参数标示预订者对应的主题,第二个参数标示预订者的名称,此名称与前面的连接名称clientID组合就成了持久预定的唯一标示。在连接断开后,用户只要再次连接,并配置同样的clientID与subName,就可以接受到所有连接断开期间的消息了。

在MQ中,持久预定的实现形式是采用了一个动态队列,每当用户新申请一个持久预定时,MQ会在队列管理中动态新增一个本地队列,此队列是系统维护的,有一段长的随机数字代码标示它。在一般情况下看不到它,只有选择“看到系统对象时”才可以看到。当持久预定者与MQ断开连接时,所有发布到预订者主题的消息都会放到队列中存储,当预订者再次连接时,就可以取得在队列中存储的消息了。

持久预定的实现不仅需要存储消息,更要在系统中维护所有持久预定对象以及与之对应的动态队列,因此必须有一种机制去管理它,不然会对系统的资源造成损失与消耗。在JMS规范中,session有一个函数unsubscribe(subName),此函数的作用就是在持久订阅不再需要时,可以删除它以保证系统资源的不损失。具体实现代码如下:

       publicsynchronized void unsubscribeSubscriber() {

              try{

                     if(isOpen == false)

                     {

                            factoryFactory= JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);

                            connectionFactory= factoryFactory.createConnectionFactory();

                    

                            connectionFactory.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE,WMQConstants.WMQ_CM_BINDINGS);

                            connectionFactory.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER,MQ_QUEUEMANAGER_NAME);

                           

                            connection= connectionFactory.createConnection();

                           

                            connection.setClientID(clientID);

                            connection.start(); 

                           

                            booleantransacted=false;

                            session= connection.createSession(transacted,Session.AUTO_ACKNowLEDGE);

 

                            session.unsubscribe(subName);

                                         

                            isOpen= true;

                           

                            //关闭连接

                            destination= null;

 

                            session.close();

                            session= null;

                           

                            connection.close();

                            connection= null;

 

                            connectionFactory= null;

                            factoryFactory = null;

                            isOpen= false;                    

                     }           

              }catch (JMSException e) {

                     e.printstacktrace();

              }

       }

 

       在我们的WebService服务中,提供了三种接口,生成持久预定,获得持久消息,注销持久预定,在用户需要调用发布预定服务时,不同的用户机,不同的访问程序需要生成持久预定,并维护预定名称等信息,在使用完预定服务后,必须注销持久预定以保护系统的资源。如果他们没有做的话,MQ中会积累越来越多的临时队列以及队列中存储的消息,最终系统会因不堪重负而宕掉。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐