- 今天来说一下在使用到MQ时如果使用MQ的链接池。之前我也是没有注意到MQ也是有连接池的,后来因为系统之前实现每次创建和关闭链接消耗资源、宕机频繁,所以领导要求解决我才接触到。
我在网上看到的关于JMS的讲解还挺多,但是对于MQ连接池的讲解时大家都是讲如何在spring中配置连接池。首先采用spring配置后原系统加密配置的密码就成明文了,另外如果要改成spring发送那改动就大了。如果在不使用spring,不大改动代码的情况下完成采用MQ连接池来获得链接呢?
现在哥已经养成了好的习惯,每次发博客都会将源码和工程直接发布的,不会让你看了半天连个例子也没有的。 - 首先要创建工程加入JAR包,要加的包都是工程内,哥不多说了,这些包是必须的。至于activemq-pool-5.2.0.jar,你一定要找到加进去,你懂的。
你还要到官方下载一个MQ服务器,我使用的是5.2,下载后运行起来以备使用。 - 第二步我们要实现如何获得链接的。这里我做了两个实现,用于更直观的了解MQ连接池。
其中连接池工厂的活跃数我们设置为1,方便测试。这个活跃数实际上是会话的数量,而不是数据库连接池一样的链接数量。package com.mq.service; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.pool.PooledConnectionFactory; /** * 链接工厂管理类 * 自己工厂定义成了单例模式,连接池是静态块进行初始化,具体实现自己看着办 */ public class MQPooledConnectionFactory { private static ActiveMQConnectionFactory connectionFactory; /** * 获得自己创建的链接工厂,这个工厂只初始化一次 */ public static ActiveMQConnectionFactory getMyActiveMQConnectionFactory() { if (null == connectionFactory) { connectionFactory = new ActiveMQConnectionFactory("system","manage", "tcp://127.0.0.1:61616"); } return connectionFactory; } private static PooledConnectionFactory pooledConnectionFactory; static { try { // 需要创建一个链接工厂然后设置到连接池中 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(); activeMQConnectionFactory.setUserName("system"); activeMQConnectionFactory.setPassword("manage"); activeMQConnectionFactory.setBrokerURL("tcp://127.0.0.1:61616"); // 如果将消息工厂作为属性设置则会有类型不匹配的错误,虽然Spring配置文件中是这么配置的,这里必须在初始化的时候设置进去 pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory); // 链接最大活跃数,为了在测试中区别我们使用的到底是不是一个对象和看是否能控制连接数(实际上是会话数),我们在这里设置为1 int maximumActive = 1; pooledConnectionFactory.setMaximumActive(maximumActive); } catch (Exception e) { e.printStackTrace(); } } /** * 获得链接池工厂 */ public static PooledConnectionFactory getPooledConnectionFactory() { return pooledConnectionFactory; } /** * 对象回收销毁时停止链接 */ @Override protected void finalize() throws Throwable { pooledConnectionFactory.stop(); super.finalize(); } }
这个类会提供连接池工厂和MQ链接工厂的创建和获得方式。特别要注意到的是,在初始化连接池时要设置一个链接工厂进去,而且只能在初始化时作为参数传递进去,如果你看过spring的配置你会以为要做为属性传递,而且他确实有这个属性。 - 然后我们创建发送消息的基础类,在该类中得到链接然后创建消息进行发送,关闭会话等一些列操作。为了防止变量干扰,这些操作是在一个方法里面完成的。
package com.mq.service; import java.util.Map; import java.util.Set; import javax.jms.Connection; import javax.jms.MapMessage; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; /** * 数据发送类,用于发送数据 * 如果获得链接,请查看被注释的代码 */ public class MQ_Service { // 发送消息 @SuppressWarnings("static-access") public void send(Map<String, String> map) throws Exception { ActiveMQConnectionFactory connectionFactory = null; Connection connection = null; Session session = null; Queue queue = null; MessageProducer producer = null; MapMessage messagep = null; // ==================================== try { // 获得我们自己初始化的链接工厂然后创建链接 connectionFactory = MQPooledConnectionFactory.getMyActiveMQConnectionFactory(); connection = connectionFactory.createConnection(); // 链接直接从链接池工厂进行获得 //connection = MQPooledConnectionFactory.getPooledConnectionFactory().createConnection(); session = connection.createSession(false, session.AUTO_ACKNOWLEDGE); queue = session.createQueue("CUI_JMS"); producer = session.createProducer(queue); // 链接开始,如果我们使用的是连接池,那么即使你不开始,也是没有问题的 connection.start(); } catch (Exception e) { e.printStackTrace(); } // ==================================== messagep = session.createMapMessage(); Set<String> keySet = map.keySet(); for (String key : keySet) { String value = map.get(key); messagep.setBytes(key, value.getBytes("UTF-8")); System.out.println(key + "-->" + value); } producer.send(messagep); messagep.clearBody(); messagep.clearProperties(); // =================================== // 通过打印会话的内存地址和链接的客户端编号就可以知道我们使用的是不是同一个会话和链接 System.out.println(session.toString()); System.out.println(connection.getClientID()); // 无论使用的自己的工厂还是连接池的,都要将会话关闭 // 如果不关闭,在使用连接池的时可以看到效果,发送两次时只能发送一次,造成堵塞 session.close(); // 使用自己的工厂和连接池的区别是,运行后自己工厂链接调用关闭程序结束 // 而调用连接池链接进行关闭实际上没有关闭,因为连接池要维护这个链接 connection.close(); messagep = null; } private MQ_Service() { } // 发送对象每次创建一个,用以区别我们使用的对象 public static MQ_Service getInstance() { return new MQ_Service(); } // 对外开发发送消息方法 @SuppressWarnings("unchecked") public synchronized void sendMessage(Map map) throws Exception { this.send(map); } }
取消注释内容,并注释掉获得我们自己初始化的链接工厂然后创建链接下面的两行代码,我们的链接就是从连接池获得了。
你可以打印链接、会话等各个对象的内存地址来查看是否是同一个对象内容。 - 多次调用发送消息,查看是否能发送消息和是否能控制链接,当然准确来说是会话数
package com.mq.service; import java.util.HashMap; import java.util.Map; /** * 调用发送 */ public class MQ_Sender { public static void main(String[] args) throws Exception { // 循环调用,这里定义调用两次 for (int i = 0; i < 2; i++) { MQ_Service sender = MQ_Service.getInstance(); Map<String, String> map = new HashMap<String, String>(); map.put("MESS_NUM", "112110119"); map.put("MESS_DEPT", "本部"); sender.sendMessage(map); System.out.println("数据已经发送完毕!"); } } }
通过上面的例子,你就可以在不使用配置的情况下,在代码中也使用MQ连接池。并且更明了的俩接MQ连接池和数据库连接池的不同之处。
请您到ITEYE看我的原创:http://cuisuqiang.iteye.com
或支持我的个人博客,地址:http://www.javacui.com
相关推荐
目的:通过JMS 实现 IBM MQ的请求应答功能 工作原理:消息生产者发送消息到队列IN1,然后可以异步或者同步等待消费者接收到IN1消息后,生成应答消息,并发布到IN2队列中。生产者通过messageid在IN2队列中进行消息...
1) 本工程主要演示在SPRING BOOT工程中怎样使用JMS集成IBM-MQ及TLQ两种消息中间件产品 2) 使用SPRING BOOT Conditional机制实现了两种产品按需加载,工程会根据配置文件开关动态加载 3) 实现了普通队列消息发送与...
用jms 向webshpere mq里发送消息
MQ的命令集合 JMS 获取MQ队列信息 JSM 发送信息至MQ
JMS 客户端模式实现服务端、客户端;实现同步以及异步的收发处理;
RabbitMQ客户连接池的实现代码示例
C#连接MQ队列获取消息,发送消息到mq队列,以及消息事务提交。
MQ软件提供一个JAVA软件包,里面有JMS类库,和一套MQ的类库。不熟悉MQI编程方法的程序员可以用JMS,比较熟悉MQI编程方法的程序则可以用...下面的小程序是使用Java实现从队列管理器QM_SERVER中的队列INITQ写入或读出消息
NULL 博文链接:https://bijian1013.iteye.com/blog/2308734
JAVA实现MQ发送接收消息详解 MQ配置文档 MQ配置
一个C#实现IBM WebSphere MQ 消息收发的实例,包含 发送接收等. 使用的时候只需要修改 appconfig 文件的内容即可. 如有问题.请留言
IBM MQ将消息发送至远程队列,文档详细,有截图,有命令
基于SPring+MQ6.0实现JMS异步通讯,
activemq消息中间件的使用demo,以及spring集合jms实现消息发送和处理。
JAVA向MQ发送消息, JAVA向MQ发送消息, JAVA向MQ发送消息,
MQ发送消息源代码
JAVA IBM MQ 接收消息、发送消息例子
基于Java编写的,SWT界面的完整的发送接收消息。 演示了分组和设置。 最近学习MQ写的,是学习的捷径。