当前位置: 首页 > 知识库问答 >
问题:

Spring集成:如何增加对传入消息的处理

唐法
2023-03-14

我正在开发一个Spring应用程序,它每分钟将接收大约500条xml消息。下面的xml配置只允许每分钟处理大约60条消息,其余消息存储在队列中(持久化在DB中),并以每分钟60条消息的速率检索。

尝试从多个来源阅读文档,但仍然不清楚轮询器和任务执行器的角色。我对当前每分钟处理60条消息的理解是因为轮询器配置中的“固定延迟”值设置为10(因此它将在1分钟内轮询6次),“每轮询最大消息数”设置为10,因此6x10=每分钟处理60条消息。

如果我的理解不正确,请提供建议,并帮助修改xml配置,以实现以更高的速率处理传入消息。

任务执行器的角色也不清楚——这是否意味着pool-size=“50”将允许50个线程并行运行来处理轮询器轮询的消息?

我想要的全部是:

  1. JdbcChannelMessageStore用于将传入的xml消息存储在数据库(INT_CHANNEL_MESSAGE)表中。这是必需的,以便在服务器重新启动时,消息仍存储在表中,不会丢失

抱歉,如果这已经在别处得到了回答,但在阅读了大量帖子后,我仍然不明白这是怎么回事。

提前感谢。

<!-- Message Store configuration start -->              

    <!-- JDBC message store configuration -->
    <bean id="store" class="org.springframework.integration.jdbc.store.JdbcChannelMessageStore">
        <property name="dataSource" ref="dataSource"/>
        <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
        <property name="region" value="TX_TIMEOUT"/>
        <property name="usingIdCache" value="true"/>
    </bean>

    <bean id="queryProvider" class="org.springframework.integration.jdbc.store.channel.MySqlChannelMessageStoreQueryProvider" />        

<int:transaction-synchronization-factory
    id="syncFactory">
    <int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" />
    <int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())" />
</int:transaction-synchronization-factory>

<task:executor id="pool" pool-size="50" queue-capacity="100" rejection-policy="CALLER_RUNS" />  

<int:poller id="messageStorePoller" fixed-delay="10"
    receive-timeout="500" max-messages-per-poll="10" task-executor="pool"
    default="true" time-unit="SECONDS">
    <int:transactional propagation="REQUIRED"
        synchronization-factory="syncFactory" isolation="READ_COMMITTED"
        transaction-manager="transactionManager" /> 
</int:poller>

<bean id="transactionManager"
    class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
    <!--  1)        Store the message in  persistent message store -->
    <int:channel id="incomingXmlProcessingChannel">
         <int:queue message-store= "store" />
    </int:channel> 

    <!-- 2) Check in, Enrich the headers, Check out -->
    <!-- (This is the entry point for WebService requests) -->
    <int:chain input-channel="incomingXmlProcessingChannel" output-channel="incomingXmlSplitterChannel">
        <int:claim-check-in message-store="simpleMessageStore" />
        <int:header-enricher >
            <int:header name="CLAIM_CHECK_ID" expression="payload"/>
            <int:header name="MESSAGE_ID" expression="headers.id" />
            <int:header name="IMPORT_ID" value="XML_IMPORT"/>
        </int:header-enricher>
        <int:claim-check-out message-store="simpleMessageStore" />          
    </int:chain>

在Artem回复后添加:

谢谢你,阿泰姆。因此,在固定延迟10秒后发生的每次轮询(根据上面的配置),任务执行者将检查任务队列,如果可能(并且需要),启动一个新任务?每个轮询任务(线程)将从消息存储(队列)接收“10”条消息,按照“maxMessagesPerPoll”配置。

为了获得更高的传入消息处理时间,我是否应该减少轮询器上的固定延迟,以便任务执行器可以启动更多线程?如果我将fixedDelay设置为2秒,将启动一个新线程来执行10条消息,一分钟内将启动大约30个这样的线程,一分钟内处理“大约”300条传入消息。

抱歉在一个问题上问得太多了——只是想解释完整的问题。

共有1个答案

屠浩
2023-03-14

这个类的主要逻辑是:

    private final class Poller implements Runnable {

    private final Callable<Boolean> pollingTask;

    Poller(Callable<Boolean> pollingTask) {
        this.pollingTask = pollingTask;
    }

    @Override
    public void run() {
        AbstractPollingEndpoint.this.taskExecutor.execute(() -> {
            int count = 0;
            while (AbstractPollingEndpoint.this.initialized
                    && (AbstractPollingEndpoint.this.maxMessagesPerPoll <= 0
                    || count < AbstractPollingEndpoint.this.maxMessagesPerPoll)) {
                try {
                    if (!Poller.this.pollingTask.call()) {
                        break;
                    }
                    count++;
                }
                catch (Exception e) {
                    if (e instanceof MessagingException) {
                        throw (MessagingException) e;
                    }
                    else {
                        Message<?> failedMessage = null;
                        if (AbstractPollingEndpoint.this.transactionSynchronizationFactory != null) {
                            Object resource = TransactionSynchronizationManager.getResource(getResourceToBind());
                            if (resource instanceof IntegrationResourceHolder) {
                                failedMessage = ((IntegrationResourceHolder) resource).getMessage();
                            }
                        }
                        throw new MessagingException(failedMessage, e);
                    }
                }
                finally {
                    if (AbstractPollingEndpoint.this.transactionSynchronizationFactory != null) {
                        Object resource = getResourceToBind();
                        if (TransactionSynchronizationManager.hasResource(resource)) {
                            TransactionSynchronizationManager.unbindResource(resource);
                        }
                    }
                }
            }
        });
    }

}

正如您所看到的,taskExec导师负责旋转污染任务,直到maxMessagesPerPoll在一个线程中。如果当前轮询任务太长而无法执行新计划,则将涉及池中的其他线程。但是一个轮询中的所有消息都在同一个线程中处理,而不是并行处理。

这就是它的工作原理。既然你在一个这么多的问题上问得太多了,我希望这些信息足以让你明白下一步该怎么做。

 类似资料:
  • 我需要解决这个场景。我有两个amqp消费者设置来获取一条消息。 taskChannel是queuechannel,但一次只允许使用一条消息,因此没有并行处理。如果另一条消息花了太长时间才继续,我如何在超时后拒绝一条消息。那么这个消息将返回到队列,由另一个节点继续?我的意思是,这两个消费者预取了两条消息,但一次只能处理一条,所以如果第一条预取消息需要很长时间才能处理,那么如何释放第二条预取消息呢。

  • 当RabbitMq消息到达队列时,我目前正在使用IntegrationFlow来触发作业执行。IntegrationFlow的AmqpInFronChannelAdapter和作业的第一步的ItemReader都配置为从同一队列中读取消息。 我遇到的问题是IntegrationFlow的AmqpInboundChannelAdapter读取RabbitMQ消息,然后ItemReader再也找不到该

  • 我需要调整返回到集成流调用方的所有错误消息,以便不泄漏信息,无论它们是作为异常抛出还是作为异步错误处理。我希望避免在每个消息处理程序上定义自定义错误通道。我尝试将转换器连接到默认错误通道,但未成功: 是否有一个中心点可以转换所有传出的错误消息?

  • 我对Spring集成相对较新,但我的任务是实现一个tcp网关,该网关需要: 在套接字上侦听消息 在我的Spring集成经验中,消息流不是双向的。我只将路由器配置为侦听、处理消息和输出到队列/主题。但是,在这种情况下,我需要接受消息并返回响应,同时将某些消息转发到队列。建议? 这是到目前为止我的集成xml。 如何将handleInput的输出转发到队列,同时从网关返回一些响应? 编辑:在与Gary进

  • 我正在开发一个使用Spring Integration 5.0.1和Spring Boot 2.0.0的应用程序。RC1 目前,应用程序响应并运行一些可能需要一段时间才能完成的初始化代码。这不使用任何Spring集成组件。 我还有一些非常基本的集成流,使用JavaDSL编写,并在配置中声明为bean。 有什么方法可以推迟流何时开始消耗消息吗?我希望能够在初始化完成时手动启动它们。 配置似乎是解决方

  • 我正在尝试使用spring integration设置我的应用程序,作为一名新手,需要以下用例的建议- 有一个队列,来自另一个应用程序的消息将被推送到该队列。我的应用程序使用队列中的消息,进行一些数据处理,然后将其推送到另一个出站队列。目标是以并发方式处理消息。 根据我的理解,我们可以有两种方法- 1.使用#轮询器 2.使用#调度器 从基于轮询器的配置来看,池中似乎有多个可用线程,可以同时获取消息