消费者是一个spring集成项目,它从消息队列中消费并执行大量处理。我使用Executor通道并行处理消息,然后流通过一些公共处理程序类。
请在下面的代码片段中找到-
baseVentFlow()
-我们从EMS队列接收消息并将其发送到路由器router()
-基于以下消息的id:“特定ExecutorChannel实例配置了一个单线程执行器。每个ExecutorChannel都将是它的专用执行器,只有一个线程。skwdefaultChannel()、gjsucaDefaultChannel()、rpaDefaultChannel()
-所有ExecutorChannel bean都用@bridgeto标记为启动公共流的同一通道。uaEventFlow()
-这里将处理每个消息@Bean
public IntegrationFlow baseEventFlow() {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(Jms.container(this.emsConnectionFactory, this.emsQueue).get()))
.wireTap(FLTAWARE_WIRE_TAP_CHNL)
.route(router()).get();
}
public AbstractMessageRouter router() {
return new AbstractMessageRouter() {
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
if (message.getPayload().toString().contains("\"id\":\"RPA")) {
return Collections.singletonList(skwDefaultChannel());
}else if (message.getPayload().toString().contains("\"id\":\"ASH")) {
return Collections.singletonList(rpaDefaultChannel());
} else if (message.getPayload().toString().contains("\"id\":\"GJS")
|| message.getPayload().toString().contains("\"id\":\"UCA")) {
return Collections.singletonList(gjsucaDefaultChannel());
} else {
return Collections.singletonList(new NullChannel());
}
}
};
}
@Bean
@BridgeTo("uaDefaultChannel")
public MessageChannel skwDefaultChannel() {
return MessageChannels.executor(SKW_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
}
@Bean
@BridgeTo("uaDefaultChannel")
public MessageChannel gjsucaDefaultChannel() {
return MessageChannels.executor(GJS_UCA_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
}
@Bean
@BridgeTo("uaDefaultChannel")
public MessageChannel rpaDefaultChannel() {
return MessageChannels.executor(RPA_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
}
@Bean
public IntegrationFlow uaEventFlow() {
return IntegrationFlows.from("uaDefaultChannel")
.wireTap(UA_WIRE_TAP_CHNL)
.transform(eventHandler, "parseEvent")
.handle(uaImpl, "process").get();
}
我担心的是在uaEVentFlow()中,常见的转换和处理程序方法不是线程安全的,这可能会导致问题。我们如何确保在每次消息调用时注入一个新的转换器和处理程序?我应该将转换器和处理程序bean的范围改为原型吗?
您应该将.transform()
和.handle()
移动到每个上游流,并添加
java prettyprint-override">@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
到它们的@bean
定义中,以便每个都获得自己的实例。
但是,通常最好使代码是线程安全的。
除了接口比普通线程(例如管理)有一些优势之外,执行以下操作之间是否存在真正的内部差异(性能差异大、资源消耗……): 以及: 我只问这里的一个线索。
消费者是一个spring集成项目,它从消息队列中消费并执行大量处理。目前,它是单线程的,不能与生产者发送消息的速度匹配。因此队列深度不断增加 是否可以使用executor通道在多线程环境中处理此问题并加快使用者进程 如果是,请建议如何实现--为了确保消息的顺序,我需要将相同类型的消息(基于消息的id)分配给executor通道的相同线程。 因此executor通道上的BridgeTo将转发消息
有没有办法为Spring云流中的输出通道配置并发? 例如,我正在考虑如何为输出MessageChannel或通过配置属性设置线程执行器,如果这是一个好主意的话,在Spring-Cloud-Stream服务的情况下。 我还没有找到一种方法,那么这是否意味着spring stream cloud可以很好地为我们管理并发性(线程数量、放大/缩小策略),我们最好不要触及这一部分? 谢谢你,西蒙
我有一个从Rabbit接收消息的应用程序。当收到一条消息时,它会对它进行处理,然后在完成时执行ACK。应用程序可以在一个固定的线程池中同时处理2个项目,有2个线程。Rabbit的QOS预取设置为2,因为我不想在一个时间框架内给应用提供超过它所能处理的内容。 现在,我的消费者的handleDelivery执行以下操作: 此时,您已经发现TestWrapperThread将调用作为最后一个操作。 根据
我确信这两个列表都不是空的,并且正在调用,但是没有调用order execution run方法....
问题内容: 从Java使用gRPC时,我可以缓存存根(客户端)并在多线程环境中调用它们吗?或者通道是线程安全的并且可以安全地缓存吗? 如果网络中断,我应该重新创建通道还是它足够智能以重新连接?我在http://www.grpc.io/docs/上找不到相关信息 谢谢 问题答案: 回答第一个问题: 通道是线程安全的;标有注释。存根也是线程安全的,这就是为什么重新配置会创建新的存根的原因。 回答第二个