当前位置: 首页 > 知识库问答 >
问题:

多线程执行器通道,加快使用者进程

田俊爽
2023-03-14

消费者是一个spring集成项目,它从消息队列中消费并执行大量处理。目前,它是单线程的,不能与生产者发送消息的速度匹配。因此队列深度不断增加

        return IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(Jms.container(this.emsConnectionFactory, this.emsQueue).get()))
                .wireTap(FLTAWARE_WIRE_TAP_CHNL)// push raw fa data
                .filter(ingFilter, "filterMessageOnEvent").transform(eventHandler, "parseEvent")
                .aggregate(a -> a.correlationStrategy(corStrgy, "getCorrelationKey").releaseStrategy(g -> {

                    boolean eonExists = g.getMessages().stream()
                            .anyMatch(eon -> ((FlightModel) eon.getPayload()).getEstGmtOnDtm() != null);
                    if (eonExists) {
                        boolean einExists = g.getMessages().stream()
                                .anyMatch(ein -> ((FlightModel) ein.getPayload()).getEstGmtInDtm() != null);
                        if (einExists) {
                            return true;
                        }
                    }
                    return false;
                }).messageStore(this.messageStore)).channel("AggregatorEventChannel").get();

是否可以使用executor通道在多线程环境中处理此问题并加快使用者进程

如果是,请建议如何实现--为了确保消息的顺序,我需要将相同类型的消息(基于消息的id)分配给executor通道的相同线程。

    public static final MessageChannel SKW_DEFAULT_CHANNEL = MessageChannels
            .executor(ASQ_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();

    public static final MessageChannel RPA_DEFAULT_CHANNEL = MessageChannels
            .executor(ASH_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();

    @Bean
    public IntegrationFlow baseEventFlow1() {

        return IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(Jms.container(this.emsConnectionFactory, this.emsQueue).get()))
                .wireTap(FLTAWARE_WIRE_TAP_CHNL)// push raw fa data
                .filter(ingFilter, "filterMessageOnEvent").route(route()).get();
    }

    public AbstractMessageRouter router() {
        return new AbstractMessageRouter() {
            @Override
            protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
                if (message.getPayload().toString().contains("\"id\":\"RPA")) {
                    return Collections.singletonList(RPA_DEFAULT_CHANNEL);
                } else if (message.getPayload().toString().contains("\"id\":\"SKW")) {
                    return Collections.singletonList(SKW_DEFAULT_CHANNEL);
                } else {
                    return Collections.singletonList(new NullChannel());
                }
            }

        };
    }
    @Bean
    @BridgeTo("uaxDefaultChannel")
    public MessageChannel ucaDefaultChannel() {
        return MessageChannels.executor(UCA_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
    }

    @Bean
    @BridgeTo("uaDefaultChannel")
    public MessageChannel ualDefaultChannel() {
        return MessageChannels.executor(UAL_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
    }
    @Bean
    public IntegrationFlow uaEventFlow() {
        return IntegrationFlows.from("uaDefaultChannel").wireTap(UA_WIRE_TAP_CHNL)
                .transform(eventHandler, "parseEvent")
}

因此executor通道上的BridgeTo将转发消息

共有1个答案

堵鸿光
2023-03-14

因此队列深度不断增加

由于看起来您的队列位于JMS broker上的某个位置,所以确实可以有这样的行为。这正是为消息传递系统所设计的--区分生产者和消费者,并在可能的情况下处理目的地的消息。

如果希望增加来自JMS的轮询,可以考虑在JMS容器上设置concurrency选项:

/**
 * The concurrency to use.
 * @param concurrency the concurrency.
 * @return current {@link JmsDefaultListenerContainerSpec}.
 * @see DefaultMessageListenerContainer#setConcurrency(String)
 */
public JmsDefaultListenerContainerSpec concurrency(String concurrency) {
    this.target.setConcurrency(concurrency);
    return this;
}

/**
 * The concurrent consumers number to use.
 * @param concurrentConsumers the concurrent consumers count.
 * @return current {@link JmsDefaultListenerContainerSpec}.
 * @see DefaultMessageListenerContainer#setConcurrentConsumers(int)
 */
public JmsDefaultListenerContainerSpec concurrentConsumers(int concurrentConsumers) {
    this.target.setConcurrentConsumers(concurrentConsumers);
    return this;
}

/**
 * The max for concurrent consumers number to use.
 * @param maxConcurrentConsumers the max concurrent consumers count.
 * @return current {@link JmsDefaultListenerContainerSpec}.
 * @see DefaultMessageListenerContainer#setMaxConcurrentConsumers(int)
 */
public JmsDefaultListenerContainerSpec maxConcurrentConsumers(int maxConcurrentConsumers) {
    this.target.setMaxConcurrentConsumers(maxConcurrentConsumers);
    return this;
}
 类似资料:
  • 消费者是一个spring集成项目,它从消息队列中消费并执行大量处理。我使用Executor通道并行处理消息,然后流通过一些公共处理程序类。 请在下面的代码片段中找到- -我们从EMS队列接收消息并将其发送到路由器 -基于以下消息的id:“特定ExecutorChannel实例配置了一个单线程执行器。每个ExecutorChannel都将是它的专用执行器,只有一个线程。 -所有ExecutorCha

  • 除了接口比普通线程(例如管理)有一些优势之外,执行以下操作之间是否存在真正的内部差异(性能差异大、资源消耗……): 以及: 我只问这里的一个线索。

  • 我的应用程序有多个线程将消息发布到单个RabbitMQ集群。 阅读rabbit文档:我阅读了以下内容: 对于使用多个线程/进程进行处理的应用程序,每一个线程/进程打开一个新通道,并且不在它们之间共享通道是非常常见的。 而且我明白,与其开通多个连接(昂贵) 不如开通多个通道。 但是为什么不对所有线程使用单个通道呢? 在单个通道上使用多个通道有什么好处?

  • 问题内容: 刚刚开始学习多线程。我在多个线程中有5个生产者和2个消费者。基本上,该程序将100个项目添加到队列中。当队列大小为100时,生产者将停止添加。我希望消费者在消费者从队列中删除所有项目时通知生产者,以便生产者可以再次开始添加。当前,生产者将等待,但永远不会收到消费者的通知。 制片人: 消费者: 主班 问题答案: 从oracle文档页面: BlockingQueue实现是线程安全的。所有排

  • 我们正在对使用SpringBoot 2.2.2和Spring执行器的应用程序进行性能测试。 我们希望监控: 正在使用多少tomcat线程 有多少tomcat请求正在排队 正在使用多少个ThreadPoolTaskExector线程(我们将@Async与线程池一起用于某些任务) 执行器中是否提供此信息?我看不到需要使用哪些指标。

  • 14.2.1 概念 在第一个例子中,协程是独立执行的,他们之间没有通信。他们必须通信才会变得更有用:彼此之间发送和接收信息并且协调/同步他们的工作。协程可以使用共享变量来通信,但是很不提倡这样做,因为这种方式给所有的共享内存的多线程都带来了困难。 而 Go 有一种特殊的类型,通道(channel),就像一个可以用于发送类型化数据的管道,由其负责协程之间的通信,从而避开所有由共享内存导致的陷阱;这种