当前位置: 首页 > 知识库问答 >
问题:

执行器通道中的线程安全

艾英范
2023-03-14

消费者是一个spring集成项目,它从消息队列中消费并执行大量处理。我使用Executor通道并行处理消息,然后流通过一些公共处理程序类。

请在下面的代码片段中找到-

  1. baseVentFlow()-我们从EMS队列接收消息并将其发送到路由
  2. router()-基于以下消息的id:“特定ExecutorChannel实例配置了一个单线程执行器。每个ExecutorChannel都将是它的专用执行器,只有一个线程。
  3. skwdefaultChannel()、gjsucaDefaultChannel()、rpaDefaultChannel()-所有ExecutorChannel bean都用@bridgeto标记为启动公共流的同一通道。
  4. uaEventFlow()-这里将处理每个消息
@Bean
public IntegrationFlow baseEventFlow() {
    return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(Jms.container(this.emsConnectionFactory, this.emsQueue).get()))
            .wireTap(FLTAWARE_WIRE_TAP_CHNL)
            .route(router()).get();
}

public AbstractMessageRouter router() {
    return new AbstractMessageRouter() {
        @Override
        protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
            if (message.getPayload().toString().contains("\"id\":\"RPA")) {
                return Collections.singletonList(skwDefaultChannel());
            }else if (message.getPayload().toString().contains("\"id\":\"ASH")) {
                return Collections.singletonList(rpaDefaultChannel());
            } else if (message.getPayload().toString().contains("\"id\":\"GJS")
                    || message.getPayload().toString().contains("\"id\":\"UCA")) {
                return Collections.singletonList(gjsucaDefaultChannel());
            } else {
                return Collections.singletonList(new NullChannel());
            }
        }
    };
}

@Bean
@BridgeTo("uaDefaultChannel")
public MessageChannel skwDefaultChannel() {
    return MessageChannels.executor(SKW_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
}

@Bean
@BridgeTo("uaDefaultChannel")
public MessageChannel gjsucaDefaultChannel() {
    return MessageChannels.executor(GJS_UCA_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
}

@Bean
@BridgeTo("uaDefaultChannel")
public MessageChannel rpaDefaultChannel() {
    return MessageChannels.executor(RPA_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
}

@Bean
public IntegrationFlow uaEventFlow() {
    return IntegrationFlows.from("uaDefaultChannel")
             .wireTap(UA_WIRE_TAP_CHNL)
             .transform(eventHandler, "parseEvent")
             .handle(uaImpl, "process").get();
}

我担心的是在uaEVentFlow()中,常见的转换和处理程序方法不是线程安全的,这可能会导致问题。我们如何确保在每次消息调用时注入一个新的转换器和处理程序?我应该将转换器和处理程序bean的范围改为原型吗?

共有1个答案

韶兴德
2023-03-14

您应该将.transform().handle()移动到每个上游流,并添加

java prettyprint-override">@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)

到它们的@bean定义中,以便每个都获得自己的实例。

但是,通常最好使代码是线程安全的。

 类似资料:
  • 除了接口比普通线程(例如管理)有一些优势之外,执行以下操作之间是否存在真正的内部差异(性能差异大、资源消耗……): 以及: 我只问这里的一个线索。

  • 消费者是一个spring集成项目,它从消息队列中消费并执行大量处理。目前,它是单线程的,不能与生产者发送消息的速度匹配。因此队列深度不断增加 是否可以使用executor通道在多线程环境中处理此问题并加快使用者进程 如果是,请建议如何实现--为了确保消息的顺序,我需要将相同类型的消息(基于消息的id)分配给executor通道的相同线程。 因此executor通道上的BridgeTo将转发消息

  • 有没有办法为Spring云流中的输出通道配置并发? 例如,我正在考虑如何为输出MessageChannel或通过配置属性设置线程执行器,如果这是一个好主意的话,在Spring-Cloud-Stream服务的情况下。 我还没有找到一种方法,那么这是否意味着spring stream cloud可以很好地为我们管理并发性(线程数量、放大/缩小策略),我们最好不要触及这一部分? 谢谢你,西蒙

  • 我有一个从Rabbit接收消息的应用程序。当收到一条消息时,它会对它进行处理,然后在完成时执行ACK。应用程序可以在一个固定的线程池中同时处理2个项目,有2个线程。Rabbit的QOS预取设置为2,因为我不想在一个时间框架内给应用提供超过它所能处理的内容。 现在,我的消费者的handleDelivery执行以下操作: 此时,您已经发现TestWrapperThread将调用作为最后一个操作。 根据

  • 我确信这两个列表都不是空的,并且正在调用,但是没有调用order execution run方法....

  • 问题内容: 从Java使用gRPC时,我可以缓存存根(客户端)并在多线程环境中调用它们吗?或者通道是线程安全的并且可以安全地缓存吗? 如果网络中断,我应该重新创建通道还是它足够智能以重新连接?我在http://www.grpc.io/docs/上找不到相关信息 谢谢 问题答案: 回答第一个问题: 通道是线程安全的;标有注释。存根也是线程安全的,这就是为什么重新配置会创建新的存根的原因。 回答第二个