我有一个有状态的会话bean,我在那里发送和接收JMS消息。所有的连接设置都是手动处理的,因此bean包含javax.jms.connection和javax.jms.Session的实例。该bean还实现了MessageListener,使其能够接收消息。
注意:这是在Java EE环境(GlassFish4.0)中执行的
编辑:
import javax.ejb.LocalBean;
import javax.ejb.Stateful;
import javax.inject.Inject;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import com.sun.messaging.ConnectionConfiguration;
import com.sun.messaging.QueueConnectionFactory;
@LocalBean
@Stateful
public class OpenMqClient implements MessageListener{
private Connection connection;
private Session session;
private MessageConsumer responseConsumer;
public OpenMqClient(){}
public void sendMessage(String messageContent, String jmsBrokerUri, String queueName) {
try{
String host = System.getProperty("foo", jmsBrokerUri);
QueueConnectionFactory cf = new QueueConnectionFactory();
cf.setProperty(ConnectionConfiguration.imqAddressList, host);
connection = null;
session = null;
//Setup connection
connection = cf.createConnection();
connection.start();
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//Setup queue and producer
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
//Reply destination
Queue responseQueue = session.createTemporaryQueue();
responseConsumer = session.createConsumer(responseQueue);
responseConsumer.setMessageListener(this);
//Create message
TextMessage textMessage = session.createTextMessage();
textMessage.setJMSReplyTo(responseQueue);
textMessage.setJMSCorrelationID("test0101");
textMessage.setText(messageContent);
producer.send(textMessage);
System.out.println("Message sent");
} catch (JMSException e) {
e.printStackTrace();
System.out.println("JMSException in Sender");
}
}
@Override
public void onMessage(Message arg0) {
//On this event I want to close the session and connection, but it's not permitted
}
}
就我个人而言,这就是我要做的(注意,我没有测试或添加太多错误处理到这段代码中)。
>
关闭新线程中的会话
public class OpenMqClient implements MessageListener {
private static Connection connection;
private static final String mutex = "mutex";
private Session session;
private MessageConsumer responseConsumer;
public OpenMqClient() {
if(connection == null) {
synchronized(mutex) {
if(connection == null) {
String host = System.getProperty("foo", jmsBrokerUri);
QueueConnectionFactory cf = new QueueConnectionFactory();
cf.setProperty(ConnectionConfiguration.imqAddressList, host);
// Setup connection
connection = cf.createConnection();
connection.start();
}
}
}
}
public void sendMessage(String messageContent, String jmsBrokerUri, String queueName) {
try {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Setup queue and producer
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
// Reply destination
Queue responseQueue = session.createTemporaryQueue();
responseConsumer = session.createConsumer(responseQueue);
responseConsumer.setMessageListener(this);
// Create message
TextMessage textMessage = session.createTextMessage();
textMessage.setJMSReplyTo(responseQueue);
textMessage.setJMSCorrelationID("test0101");
textMessage.setText(messageContent);
producer.send(textMessage);
System.out.println("Message sent");
} catch (JMSException e) {
e.printStackTrace();
System.out.println("JMSException in Sender");
}
}
@Override
public void onMessage(Message arg0) {
// do stuff
new Thread(
new Runnable() {
@Override
public void run() {
if(session != null)
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
).start();
}
}
我有一个Java TLS客户端,它可以向服务器发送一系列请求,每个请求后面都有对服务器的响应。 但是,有许多不同的服务器。有些是“多消息”服务器,在第一个请求后保持连接打开,以便可以通过第一个连接发送后续请求。另一些是“单消息”服务器,在每条消息之后关闭连接,因此后续消息需要新的连接。客户端没有先验的方法来知道它正在与什么类型的服务器通信,也无法修复服务器。 非常希望单个消息服务能够在没有完全握手
问题内容: 我想向同一队列发送一批20k JMS消息。我使用10个线程将任务拆分,因此每个线程将处理2k条消息。我不需要交易。 我想知道是否建议建立一个连接,一个会话和10个生产者? 如果所有线程共享一个生产者,该怎么办?我的消息会损坏还是会同步发送(不会提高性能)? 如果我总是连接到同一队列,那么决定是创建新连接还是会话的一般指导方针是什么? 谢谢你,很抱歉一次问了很多。 问题答案: 如果某些消
在RabbitMQ总线上使用带重载的spring amqp,我们有时会从org获取日志。springframework。amqp。兔子联系CachingConnectionFactory说:通道关闭:清洁通道关闭;协议方法:#方法 你能解释一下这个日志吗?为什么它处于错误级别?我们有什么调整吗?提前谢谢你的回答。
问题内容: 我最近开始在应用程序中使用hibernate和c3p0作为ORM。但是,当我关闭会话工厂时,连接池不会自行关闭!这是我的应用程序中 唯一 可以进行会话操作的地方。 这是我的配置文件 请注意,空闲连接非常短的原因是它是我尚未通过集成测试的唯一方法。他们经常打开和关闭会话工厂,因此我总是用尽所有连接。正如我们在项目开始时一样,从长远来看,我认为这不是一个非常可持续的策略。 需要注意的“有趣
我最近转到了一个项目,在这个项目中我遇到了很多这种性质的代码--(这是使用jdbc postgres驱动程序) 显然,这段代码已经在生产中运行了一段时间,没有引起问题。 为了进一步澄清,如果我的理解是正确的(即,statement和resultset必须在连接关闭之前而不是之后关闭),我需要在catch和finally之间重复一些代码。修订后的代码如下所示。这可以简化吗? 只是为了透视,这段代码是
我使用的是Camel 2.16.1。关闭后,骆驼的消费者仍然接受新消息。有没有办法迫使消费者立即停止消费。这里也有同样的问题:驼峰关机策略:飞行中的信息不会减少 我为这个问题创建了一个测试用例: 运行测试用例时,我们可以看到机上交换的数量在开始优雅关闭后增加: