当前位置: 首页 > 知识库问答 >
问题:

在收到消息后关闭JMS会话和连接

公西翊歌
2023-03-14

我有一个有状态的会话bean,我在那里发送和接收JMS消息。所有的连接设置都是手动处理的,因此bean包含javax.jms.connection和javax.jms.Session的实例。该bean还实现了MessageListener,使其能够接收消息。

注意:这是在Java EE环境(GlassFish4.0)中执行的

编辑:

import javax.ejb.LocalBean;
import javax.ejb.Stateful;
import javax.inject.Inject;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import com.sun.messaging.ConnectionConfiguration;
import com.sun.messaging.QueueConnectionFactory;

@LocalBean
@Stateful
public class OpenMqClient implements MessageListener{
    private Connection connection;
    private Session session;
    private MessageConsumer responseConsumer;

    public OpenMqClient(){}

    public void sendMessage(String messageContent, String jmsBrokerUri, String queueName) {
        try{
            String host = System.getProperty("foo", jmsBrokerUri);
            QueueConnectionFactory cf = new QueueConnectionFactory();
            cf.setProperty(ConnectionConfiguration.imqAddressList, host);
            connection = null;
            session = null;

            //Setup connection
            connection = cf.createConnection();
            connection.start();
            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

            //Setup queue and producer
            Queue queue = session.createQueue(queueName);
            MessageProducer producer = session.createProducer(queue);


            //Reply destination
            Queue responseQueue = session.createTemporaryQueue();
            responseConsumer = session.createConsumer(responseQueue);
            responseConsumer.setMessageListener(this);

            //Create message
            TextMessage textMessage = session.createTextMessage();
            textMessage.setJMSReplyTo(responseQueue);
            textMessage.setJMSCorrelationID("test0101");
            textMessage.setText(messageContent);

            producer.send(textMessage);
            System.out.println("Message sent");
        } catch (JMSException e) {
            e.printStackTrace();
            System.out.println("JMSException in Sender");
        }
    }

    @Override
    public void onMessage(Message arg0) {
        //On this event I want to close the session and connection, but it's not permitted
    }

}

共有1个答案

魏成济
2023-03-14

就我个人而言,这就是我要做的(注意,我没有测试或添加太多错误处理到这段代码中)。

>

  • 使连接静态化-您可以(可能应该)为所有bean重用相同的连接,除非您有特定的理由不这样做
  • 关闭新线程中的会话

    public class OpenMqClient implements MessageListener {
    
        private static Connection connection;
        private static final String mutex = "mutex"; 
        private Session session;
        private MessageConsumer responseConsumer;
    
        public OpenMqClient() {
            if(connection == null) {
                synchronized(mutex) {
                    if(connection == null) {
                        String host = System.getProperty("foo", jmsBrokerUri);
                        QueueConnectionFactory cf = new QueueConnectionFactory();
                        cf.setProperty(ConnectionConfiguration.imqAddressList, host);
    
                        // Setup connection
                        connection = cf.createConnection();
                        connection.start();
                    }
                }
            }
        }
    
        public void sendMessage(String messageContent, String jmsBrokerUri, String queueName) {
            try {
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
                // Setup queue and producer
                Queue queue = session.createQueue(queueName);
                MessageProducer producer = session.createProducer(queue);
    
                // Reply destination
                Queue responseQueue = session.createTemporaryQueue();
                responseConsumer = session.createConsumer(responseQueue);
                responseConsumer.setMessageListener(this);
    
                // Create message
                TextMessage textMessage = session.createTextMessage();
                textMessage.setJMSReplyTo(responseQueue);
                textMessage.setJMSCorrelationID("test0101");
                textMessage.setText(messageContent);
    
                producer.send(textMessage);
                System.out.println("Message sent");
            } catch (JMSException e) {
                e.printStackTrace();
                System.out.println("JMSException in Sender");
            }
        }
    
        @Override
        public void onMessage(Message arg0) {
            // do stuff
            new Thread(
                new Runnable() {
                    @Override
                    public void run() {
                        if(session != null)
                            try {
                                session.close();
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }               
                    }
                }
            ).start();
        }
    }
    

  •  类似资料:
    • 我有一个Java TLS客户端,它可以向服务器发送一系列请求,每个请求后面都有对服务器的响应。 但是,有许多不同的服务器。有些是“多消息”服务器,在第一个请求后保持连接打开,以便可以通过第一个连接发送后续请求。另一些是“单消息”服务器,在每条消息之后关闭连接,因此后续消息需要新的连接。客户端没有先验的方法来知道它正在与什么类型的服务器通信,也无法修复服务器。 非常希望单个消息服务能够在没有完全握手

    • 问题内容: 我想向同一队列发送一批20k JMS消息。我使用10个线程将任务拆分,因此每个线程将处理2k条消息。我不需要交易。 我想知道是否建议建立一个连接,一个会话和10个生产者? 如果所有线程共享一个生产者,该怎么办?我的消息会损坏还是会同步发送(不会提高性能)? 如果我总是连接到同一队列,那么决定是创建新连接还是会话的一般指导方针是什么? 谢谢你,很抱歉一次问了很多。 问题答案: 如果某些消

    • 在RabbitMQ总线上使用带重载的spring amqp,我们有时会从org获取日志。springframework。amqp。兔子联系CachingConnectionFactory说:通道关闭:清洁通道关闭;协议方法:#方法 你能解释一下这个日志吗?为什么它处于错误级别?我们有什么调整吗?提前谢谢你的回答。

    • 问题内容: 我最近开始在应用程序中使用hibernate和c3p0作为ORM。但是,当我关闭会话工厂时,连接池不会自行关闭!这是我的应用程序中 唯一 可以进行会话操作的地方。 这是我的配置文件 请注意,空闲连接非常短的原因是它是我尚未通过集成测试的唯一方法。他们经常打开和关闭会话工厂,因此我总是用尽所有连接。正如我们在项目开始时一样,从长远来看,我认为这不是一个非常可持续的策略。 需要注意的“有趣

    • 我最近转到了一个项目,在这个项目中我遇到了很多这种性质的代码--(这是使用jdbc postgres驱动程序) 显然,这段代码已经在生产中运行了一段时间,没有引起问题。 为了进一步澄清,如果我的理解是正确的(即,statement和resultset必须在连接关闭之前而不是之后关闭),我需要在catch和finally之间重复一些代码。修订后的代码如下所示。这可以简化吗? 只是为了透视,这段代码是

    • 我使用的是Camel 2.16.1。关闭后,骆驼的消费者仍然接受新消息。有没有办法迫使消费者立即停止消费。这里也有同样的问题:驼峰关机策略:飞行中的信息不会减少 我为这个问题创建了一个测试用例: 运行测试用例时,我们可以看到机上交换的数量在开始优雅关闭后增加: