我已经对JMS负载平衡进行了很长时间的研究。我们可以为负载平衡JMS消息创建多生产者和多消费者。但我想了解,我们如何在一个生产者和一个消费者之间负载平衡JMS消息。我不能像ApacheCamel那样在我的项目中添加更多依赖项。
@Configuration
@EnableJms
@ComponentScan({"com.jmsloadbalance.jms"})
@Bean
public class JmsConfig {
public JmsTemplate getJmsTemplate() {
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(connectionFactory());
template.setDefaultDestination(new ActiveMQQueue("default.topic");
template.setExplicitQosEnabled(true);
template.setDeliveryPersistent(false);
template.setTimeToLive(60000);
template.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
template.setMessageConverter(getMessageConverter());
return template;
}
@Bean
public DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setPubSubDomain(false);
factory.setDestinationResolver(new DynamicDestinationResolver());
factory.setConcurrency("1");
factory.setMessageConverter(getMessageConverter());
return factory;
}
private ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL("vm://localhost");
return factory;
}
private MessageConverter getMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTypeIdPropertyName("JMSType");
return converter;
}
}
这是我的 JmsConfig 类,我无法在其中进行重大的配置更改,例如引入更多 JMS 模板或更多连接工厂。我的制片人如下所示
@Service("accountJmsProducer")
public class AccountJmsProducer {
private static Logger LOG = Logger.getLogger(AccountJmsProducer.class);
@Autowired
private JmsTemplate template;
private Destination destination;
public Account create(Account account) {
if (this.destination == null) {
this.destination = new ActiveMQQueue("account.create");
}
template.convertAndSend(destination, account);
return null;
}
}
我的消费者如下所示:
@Service("accountJmsConsumer")
public class AccountJmsConsumer {
private static final Logger LOG = Logger.getLogger(AccountJmsConsumer.class);
@Autowired
@Qualifier("accountService")
private AccountService accountService;
private Account lastReceived;
@JmsListener(containerFactory = "defaultJmsListenerContainerFactory", destination = "account.create")
public Account create(Account account) {
LOG.warn("Received " + account);
setLastReceived(account);
return accountService.create(account);
}
public synchronized Account getLastReceived() {
return lastReceived;
}
public synchronized void setLastReceived(Account lastReceived) {
this.lastReceived = lastReceived;
}
}
当有一个消费者时,不清楚负载平衡是什么意思,但根据您对我对您问题的评论:
只要目标是队列(而不是主题),这是隐含的,因为您有<code>工厂。setPubSubDomain(false)则它将正常工作。它是JMS合同的一部分。如果同一队列中有多个消费者,则消息将分布在这些消费者之间;只有一个消费者将接收特定消息。
如果交付失败,可能会或可能不会重新交付给同一消费者。
大多数代理(包括ActiveMQ)都提供某种预取机制。IIRC,使用ActiveMQ,默认情况下它是1000。如果您的消息少于这个数,那么一个消费者可能处于空闲状态;如果是这样,请减少预取以调整分布。
我有三根线。线程1(T1)是生成器,它生成数据。线程2和线程3(T2和T3)分别等待T1的数据在单独的循环中处理。我正在考虑在线程之间共享BlockingQueue,并通过调用“Take”让T2和T3等待。
我有一个生产者/消费者场景,我不希望一个生产者交付产品,多个消费者消费这些产品。然而,常见的情况是,交付的产品只被一个消费者消费,而其他消费者从未看到过这个特定的产品。我不想实现的是,一个产品被每个消费者消费一次,而没有任何形式的阻碍。 我的第一个想法是使用多个BlockingQueue,每个消费者使用一个,并使生产者将每个产品按顺序放入所有可用的BlockingQueues中。但是,如果其中一个
问题内容: 因此,我已经看到了许多在Go中实现一个消费者和许多生产者的方法-Go 并发中的经典fanIn函数。 我想要的是fanOut功能。它以一个通道作为参数,它从中读取一个值,并返回一个通道片,该通道将这个值的副本写入其中。 有没有正确/推荐的方法来实现这一目标? 问题答案: 您几乎描述了执行此操作的最佳方法,但这是执行此操作的一小段代码示例。 去游乐场:https : //play.gola
我有一个(Posix)服务器,它充当许多客户端到另一个上游服务器的代理。消息通常从上游服务器向下流动,然后与之匹配,并被推出到对该流量感兴趣的客户端的某个子集(维护来自上游服务器的FIFO顺序)。目前,这个代理服务器是使用事件循环的单线程(例如-select、epoll等),但现在我想将它变成多线程,这样代理就可以更充分地利用整个机器并实现更高的吞吐量。 我的高级设计是拥有一个由N个工作线程组成的
这会影响到最初连接到broker1的消费者,我不确定是否还有某种方法/配置使他们在这些代理之间透明地跳转,或者我是否需要创建两个消费者(实际上其中一个是空闲的),每个消费者都针对相应的代理?
我有一个使用ActiveMQ的消息队列。web请求用persistency=true将消息放入队列。现在,我有两个消费者,它们都作为单独的会话连接到这个队列。使用者1总是确认消息,但使用者2从不这样做。 JMS队列实现负载平衡器语义。一条消息将被一个使用者接收。如果在发送消息时没有可用的使用者,它将被保留,直到有可以处理消息的使用者可用为止。如果使用者接收到一条消息,但在关闭之前没有确认它,那么该