我们为我们想要等待响应的特定用例实现了分布式请求/响应类型体系结构。我们使用的JMS代理是ActiveMq,代码使用Spring连接在一起。
我们看到的问题是,如果向同一个目的地发送一组请求,那么任何需要花费大量时间才能完成的请求都会阻塞随后的请求消息。使用者使用的SessionAwareMessageListener接口只支持onMessage()方法。在这里实现并行性的最佳方法是什么?即,如果某个特定请求需要很长时间,则不应阻止队列中的其他消息?
有这么一篇帖子,但它没有回答我的问题。JMS:我们可以通过提交或回滚从OnMessage()中的队列中获取多条消息吗
谢啦
相关代码片段(为了简洁起见,删除了异常处理等)
制作人
public class MyJmsProducer {
private ProcessingResponse sendMessage(final Serializable serializable) {
//send JMS request and wait for response
return jmsMessagingTemplate.convertSendAndReceive(destination, serializable, ProcessingResponse.class); //this operation seems to be blocking + sync
}
}
而听者(消费者)
public class MyJmsListener
implements SessionAwareMessageListener<Message>, NotificationHandler<Task> {
@Override
public void onMessage(Message message, Session session)
throws JMSException {
ProcessingRequest processingRequest = (ProcessingRequest) ((ObjectMessage) message).getObject();
// handle the request here (THIS COULD TAKE A WHILE)
handleRequest(processingRequest);
// done handling the request, now create a response message
final ObjectMessage responseMessage = new ActiveMQObjectMessage();
responseMessage.setJMSCorrelationID(message.getJMSCorrelationID());
responseMessage.setObject(processingResponse);
// Message sent back to the replyTo address of the income message.
final MessageProducer producer = session.createProducer(message.getJMSReplyTo());
producer.send(responseMessage);
}
}
您可以使用DMLC的Conextt消费者
来提高消息的消耗速度并解决消耗慢的问题:
@Bean
public DefaultMessageListenerContainer dmlc() {
DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
dmlc.setMaxConcurrentConsumers(10);
dmlc.setConcurrentConsumers(5);
return dmlc;
}
您需要调整预取策略以适应并发消费者:
persistent queues (default value: 1000)
non-persistent queues (default value: 1000)
persistent topics (default value: 100)
non-persistent topics (default value: Short.MAX_VALUE - 1)
所有消息都被发送到第一个连接的消费者,当另一个消费者连接到同一个目的地时,他不会收到消息,因此要更改此行为,需要将prefetchPolicy设置为低于默认值的值。例如,添加这个jms。预取策略。queuePrefetch=1
到activemq中的uri配置。xml或在客户端url上设置它,如下所示
@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1");
return connectionFactory;
}
对于高消息量的高性能,建议使用大的预取值。但是,对于较低的消息量,其中每个消息需要很长时间来处理,预取应该设置为1。这可以确保消费者一次只处理一条消息。但是,将预取限制指定为零将导致消费者轮询消息,一次轮询一个消息,而不是将消息推送给消费者。
看看http://activemq.apache.org/what-is-the-prefetch-limit-for.html
和
http://activemq.apache.org/destination-options.html
我试图找到类似的问题,但找不到解决我问题的最佳方案。我在应用程序中使用SpringBoot,在代码中使用集成的solace队列。我可以使用以下代码读取solace队列中的消息: 在SpringBoot的帮助下,所有属性(如VPN)都从属性文件中提取,并创建ConnectionFactory。下面是读取消息的代码: 根据消息的类型,定位服务执行器并处理消息。它对我们完全有效。 但是,在某些情况下,我
假设我们有: 线程1,包含actor A、B和C。 包含执行元y的线程2。 包含演员Z的线程3。 演员A和B正在监听演员Y的消息。 然后,参与者C向参与者Z发出阻塞请求。 我包含了Actor Y,以允许它在Z处理来自C的请求时发送消息。 所有线程都在不同的物理核心上--它们并行运行。
我对ActiveMQ有一个奇怪的问题。我有一个队列,似乎有一个挂起的消息,但当我打开队列时,没有消息。 这里怎么了?真的有消息等待处理吗?我怎样才能把信息带回来,或者至少能看到内容? 编辑:刚刚发现ActiveMQ 5.6.0的这两个错误。这可能是那个问题的根源吗? 不正确的报告挂起QueueSize的持久子后重新连接与未破解 OrderPendingList中的问题可能导致在持久子重新连接后无法
我有一个存储库,它返回一个流量,并希望将结果设置为另一个需要列表的对象。有没有其他方法可以在不阻塞的情况下以列表的形式获取结果? 这座大楼正在运转,但需要很长时间。
因此,我使用Spring integration链接JMS和ActiveMQ,如下所示:- 如何使其工作,以便发送到此队列并从中接收消息?请帮忙。
ActiveMQ:5.10.2在ServiceMix的Karaf OSGi中 卡哈布坚持。 默认代理设置。连接中的默认设置(TCP://x.x.x.x.x:61616) 一切正常,但是:如果我将消费者的数量减少到1(或者2或3个,我不知道阈值在哪里),那么来自1个队列的消息将被消耗,来自另一个队列的消息将被存储。过了一段时间,我看到了这张照片: 1用户停止接收消息。他认为没有更多消息了。 从act