当前位置: 首页 > 知识库问答 >
问题:

Spring Integration-JPA入站通道适配器、拆分器、服务激活器并发工作单元

程俊力
2023-03-14

我有一个非常简单的Spring集成管道设置

  1. 用JPA将n次状态为X的资产加载到通道中
  2. 将每个JPA实体拆分到一个新通道上
  3. 使用服务激活器处理JPA实体,最后将实体的状态更新为y

这个管道的同步特性意味着JPA入站通道适配器只在上一个入站通道适配器中的所有消息被处理并发送到nullchannel之后才触发

这里的问题是service activator处理一条消息需要大约1秒(大部分时间是对REST API的调用),因此,处理250个JPA实体的队列可能需要250秒。

如果我们同时调用REST API,比如说5次,它仍然需要1秒。

因此,我想知道是否可以对我们的管道进行简单的修改,比如添加一个聚合器和一个任务执行,这将允许整个管道作为一个同步的“工作单元”运行,但允许服务激活器并发处理。

<channel id="newAssetChannel" />
<channel id="splitAssetChannel" />

<int-jpa:inbound-channel-adapter
        id="newAssetChannelAdapter"
        channel="newAssetChannel"
        entity-manager-factory="entityManagerFactory" 
        entity-class="com.foo.domain.Asset" 
        jpa-query="select a from Asset a where (a.status = 'NEW' or a.status = 'UPDATED') and a.health = 'OK' ORDER BY a.priority DESC, a.updatedDate ASC"
        max-results="250">
    <poller fixed-rate="5000" max-messages-per-poll="1" />
</int-jpa:inbound-channel-adapter>

<splitter expression="payload"
    input-channel="newAssetChannel"
    output-channel="splitNewAssetChannel" />

<service-activator
    id="newAssetServiceActivator"
    input-channel="splitNewAssetChannel"
    output-channel="nullChannel"
    ref="assetProcessor"
    method="processNew" />

共有1个答案

席成仁
2023-03-14

嗯,聚合器确实是等待所有答复的正确方法,但是与拆分器之后的executorchannel一起,还是可以解放poller的双手。所以,在并行拆分-聚合之前-之后应该有一些障碍。

您可以使用 :

<gateway id="gateway" default-request-channel="splitterChannel"/>

<service-activator id="gatewayTestService" input-channel="newAssetChannel" output-channel="saveRowsChannel" ref="gateway"/>

拆分器的output-channel必须是executorchannelNewAssetServiceActivator必须输出到聚合器。聚合器没有输出通道,这意味着对网关的答复。

 类似资料:
  • 我有一个队列通道和一个带有轮询器的服务激活器,轮询器从该队列中读取数据。我希望配置为“我希望50个线程轮询该队列,每次轮询并返回消息时,在此线程上调用服务激活器指向的服务。” 该服务没有异步注释,但无状态,可以以并发方式安全运行。 下面的方法能做到吗?有没有其他首选的方法来实现这一点?

  • 如果我创建一个SFTP入站通道适配器,并使用在SFTP中配置为channel属性的通道发送一些文件。文件将传输到SFTP远程目录本地目录,还是直接从通道流到本地目录

  • 问题内容: Spring Integration FTP中的入站通道适配器和出站通道适配器之间有什么区别?我应该使用哪一个?何时使用? 我从文档中了解到,出站可以发送任何类型的文件(例如byte [],String,java.io.File),但入站仅限于文件类型。那仅仅是区别还是其他? 问题答案: 我建议您首先阅读理论 。 任何Inbound适配器都旨在从外部系统获取数据。Outbound-放置

  • 问题内容: 入站和出站通道适配器之间的根本区别是什么? 任何示例都将非常有帮助。 我已经查看过Spring文档,这种“方向性”的区别对我来说还不清楚。我支持配置了outbound-channel-adapter的应用程序,但是我发现使用 出站 标签可以直观地了解行为计数器。该适配器获取一个外部文件,然后 将其 引入应用程序中, 在 该应用程序中我们解析文件并保留数据。 这类似于这个问题,但是我想更

  • 我是这个Spring集成和JMS的新手,我开始使用它。在这里,我想通过activemq创建普通的jms消息,并通过spring inbound适配器(消息驱动)接收它。 以下是我的spring配置文件 这是我的测试课。 } 但问题是我不能保证交货。有些时候程序不能接收消息,有些时候它成功了,但有一些警告,如 无法刷新目标“queue://MSG_QUEUE”的JMS连接,将在5000毫秒后重试,原

  • 我有一个模型对象,它是在多次转换和解析之后填充的。现在,我需要使用spring集成将模型中的消息属性发送给kafka。我可以使用messageKey方法构造键,但如何从m.getPayload()之类的模型中获取实际消息。getMessage()并将其发送给Kafka。