消费者是一个spring集成项目,它从消息队列中消费并执行大量处理。目前,它是单线程的,不能与生产者发送消息的速度匹配。因此队列深度不断增加
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(Jms.container(this.emsConnectionFactory, this.emsQueue).get()))
.wireTap(FLTAWARE_WIRE_TAP_CHNL)// push raw fa data
.filter(ingFilter, "filterMessageOnEvent").transform(eventHandler, "parseEvent")
.aggregate(a -> a.correlationStrategy(corStrgy, "getCorrelationKey").releaseStrategy(g -> {
boolean eonExists = g.getMessages().stream()
.anyMatch(eon -> ((FlightModel) eon.getPayload()).getEstGmtOnDtm() != null);
if (eonExists) {
boolean einExists = g.getMessages().stream()
.anyMatch(ein -> ((FlightModel) ein.getPayload()).getEstGmtInDtm() != null);
if (einExists) {
return true;
}
}
return false;
}).messageStore(this.messageStore)).channel("AggregatorEventChannel").get();
是否可以使用executor通道在多线程环境中处理此问题并加快使用者进程
如果是,请建议如何实现--为了确保消息的顺序,我需要将相同类型的消息(基于消息的id)分配给executor通道的相同线程。
public static final MessageChannel SKW_DEFAULT_CHANNEL = MessageChannels
.executor(ASQ_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
public static final MessageChannel RPA_DEFAULT_CHANNEL = MessageChannels
.executor(ASH_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
@Bean
public IntegrationFlow baseEventFlow1() {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(Jms.container(this.emsConnectionFactory, this.emsQueue).get()))
.wireTap(FLTAWARE_WIRE_TAP_CHNL)// push raw fa data
.filter(ingFilter, "filterMessageOnEvent").route(route()).get();
}
public AbstractMessageRouter router() {
return new AbstractMessageRouter() {
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
if (message.getPayload().toString().contains("\"id\":\"RPA")) {
return Collections.singletonList(RPA_DEFAULT_CHANNEL);
} else if (message.getPayload().toString().contains("\"id\":\"SKW")) {
return Collections.singletonList(SKW_DEFAULT_CHANNEL);
} else {
return Collections.singletonList(new NullChannel());
}
}
};
}
@Bean
@BridgeTo("uaxDefaultChannel")
public MessageChannel ucaDefaultChannel() {
return MessageChannels.executor(UCA_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
}
@Bean
@BridgeTo("uaDefaultChannel")
public MessageChannel ualDefaultChannel() {
return MessageChannels.executor(UAL_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
}
@Bean
public IntegrationFlow uaEventFlow() {
return IntegrationFlows.from("uaDefaultChannel").wireTap(UA_WIRE_TAP_CHNL)
.transform(eventHandler, "parseEvent")
}
因此executor通道上的BridgeTo将转发消息
因此队列深度不断增加
由于看起来您的队列位于JMS broker上的某个位置,所以确实可以有这样的行为。这正是为消息传递系统所设计的--区分生产者和消费者,并在可能的情况下处理目的地的消息。
如果希望增加来自JMS的轮询,可以考虑在JMS容器上设置concurrency
选项:
/**
* The concurrency to use.
* @param concurrency the concurrency.
* @return current {@link JmsDefaultListenerContainerSpec}.
* @see DefaultMessageListenerContainer#setConcurrency(String)
*/
public JmsDefaultListenerContainerSpec concurrency(String concurrency) {
this.target.setConcurrency(concurrency);
return this;
}
/**
* The concurrent consumers number to use.
* @param concurrentConsumers the concurrent consumers count.
* @return current {@link JmsDefaultListenerContainerSpec}.
* @see DefaultMessageListenerContainer#setConcurrentConsumers(int)
*/
public JmsDefaultListenerContainerSpec concurrentConsumers(int concurrentConsumers) {
this.target.setConcurrentConsumers(concurrentConsumers);
return this;
}
/**
* The max for concurrent consumers number to use.
* @param maxConcurrentConsumers the max concurrent consumers count.
* @return current {@link JmsDefaultListenerContainerSpec}.
* @see DefaultMessageListenerContainer#setMaxConcurrentConsumers(int)
*/
public JmsDefaultListenerContainerSpec maxConcurrentConsumers(int maxConcurrentConsumers) {
this.target.setMaxConcurrentConsumers(maxConcurrentConsumers);
return this;
}
消费者是一个spring集成项目,它从消息队列中消费并执行大量处理。我使用Executor通道并行处理消息,然后流通过一些公共处理程序类。 请在下面的代码片段中找到- -我们从EMS队列接收消息并将其发送到路由器 -基于以下消息的id:“特定ExecutorChannel实例配置了一个单线程执行器。每个ExecutorChannel都将是它的专用执行器,只有一个线程。 -所有ExecutorCha
除了接口比普通线程(例如管理)有一些优势之外,执行以下操作之间是否存在真正的内部差异(性能差异大、资源消耗……): 以及: 我只问这里的一个线索。
我的应用程序有多个线程将消息发布到单个RabbitMQ集群。 阅读rabbit文档:我阅读了以下内容: 对于使用多个线程/进程进行处理的应用程序,每一个线程/进程打开一个新通道,并且不在它们之间共享通道是非常常见的。 而且我明白,与其开通多个连接(昂贵) 不如开通多个通道。 但是为什么不对所有线程使用单个通道呢? 在单个通道上使用多个通道有什么好处?
问题内容: 刚刚开始学习多线程。我在多个线程中有5个生产者和2个消费者。基本上,该程序将100个项目添加到队列中。当队列大小为100时,生产者将停止添加。我希望消费者在消费者从队列中删除所有项目时通知生产者,以便生产者可以再次开始添加。当前,生产者将等待,但永远不会收到消费者的通知。 制片人: 消费者: 主班 问题答案: 从oracle文档页面: BlockingQueue实现是线程安全的。所有排
我们正在对使用SpringBoot 2.2.2和Spring执行器的应用程序进行性能测试。 我们希望监控: 正在使用多少tomcat线程 有多少tomcat请求正在排队 正在使用多少个ThreadPoolTaskExector线程(我们将@Async与线程池一起用于某些任务) 执行器中是否提供此信息?我看不到需要使用哪些指标。
14.2.1 概念 在第一个例子中,协程是独立执行的,他们之间没有通信。他们必须通信才会变得更有用:彼此之间发送和接收信息并且协调/同步他们的工作。协程可以使用共享变量来通信,但是很不提倡这样做,因为这种方式给所有的共享内存的多线程都带来了困难。 而 Go 有一种特殊的类型,通道(channel),就像一个可以用于发送类型化数据的管道,由其负责协程之间的通信,从而避开所有由共享内存导致的陷阱;这种