当前位置: 首页 > 知识库问答 >
问题:

用Spring集成实现MongoDB入站流

从阎宝
2023-03-14

我们将有一个Mongo集合包含多个工作单元。我的想法是,文档将有一个状态字段,其中有四个选项:未处理、处理、完成、失败。Spring Integration将被配置为从这个db读取并处理存储在那里的消息。

入站Mongo DSL流将根据未处理的值从集合中读取:

MongoDbMessageSource messageSource = new MongoDbMessageSource(mongo, new LiteralExpression("{'status' : 'UNPROCESSED'}"));
return IntegrationFlows.from(messageSource)...

这里有一个问题:如果我有几台工作机器从同一个数据库中读取数据,我希望防止它们对同一行未处理的数据进行操作,因为我的poller使用maxmessagesperpoll的保守值,或者消息处理需要一段时间。

似乎正确的方法是使用TransactionSynchronizationFactory定义一个ProcessBeforeCommit阶段将状态更新为PROCESSING,并定义一个ProcessAfterCommit阶段将状态更新为DONE或Failed。但是,在查看Pollers和TransactionManagers的API时,我并不清楚添加这一点的机制。有XML中的例子,但我看不到使用DSL的例子。

我还希望确保ProcessBeforeCommit发生在数据库读取时,而不是在处理之后······是吗?此外,如果这不是从Mongo集合中读取解决方案的最佳设计方法,请随时建议更好的架构。

共有1个答案

苏弘盛
2023-03-14

不,processBeforeCommitprocessafterCommit是非常接近的回调。他们肯定会发生在你的过程的最后。让我们假设您有一个方法,如下所示:

@Transactional
void foo() {}

当您调用这样的方法时,事务在进入方法体之前就开始了。当我们在方法体执行后退出它时,将执行beforeCommit回调。它可能会失败,因为在我们的过程中有一个外部连接(DB?)可能会丢失。只有在没有问题的情况下,我们才继续执行提交后的

可以通过AbstractMessageSourceAdvision实现:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-channels-section.html#condition-pollers来完成您所要求的内容。因此,在afterreceive()实现中,您可以将文档更新为processing,甚至决定返回null,而不是消息:仅仅因为它在数据库中的状态已经是processing。这样的建议可以被注入到pollerspec:

/**
 * Specify AOP {@link Advice}s for the {@code pollingTask}.
 * @param advice the {@link Advice}s to use.
 * @return the spec.
 */
public PollerSpec advice(Advice... advice) {
/**
 * Specify the {@link TransactionSynchronizationFactory} to attach a
 * {@link org.springframework.transaction.support.TransactionSynchronization}
 * to the transaction around {@code poll} operation.
 * @param transactionSynchronizationFactory the TransactionSynchronizationFactory to use.
 * @return the spec.
 */
public PollerSpec transactionSynchronizationFactory(
 类似资料:
  • 我正在将SpringBoot 2.0与Spring Integr5.0.3一起使用,并且我的HTTP. in边界网关有问题。我的目标是验证发布到网关的JSON,因为请求pojo由强制字段组成。 有没有一种简单的方法来验证pojo中的字段是否已设置?我已经测试过的是使用@NotNull-SpringValidation,但它似乎不受Spring集成的支持。 你好smoothny

  • 我正在尝试将spring集成配置为向队列发送消息,然后接收消息,即非常简单的事情: 我认为解耦所必需的是在流程的两端都有一个消息网关。因此,我的第一次尝试(有效)如下所示: 其中MessageReceiverHandler()是扩展AbstractMessageHandler的bean。 所以上面我们有一个用于出站消息的消息网关。我假设我们也应该有一个用于入站消息的网关,允许我们将传入消息处理与应

  • 我尝试使用以下代码,得到了回应:状态:405方法不允许。这是我的Http请求:http://localhost:8090/services/test?name=test.代码或http请求有什么问题?

  • 我不熟悉Spring集成。我正在尝试使用http入站网关构建一个简单的应用程序。下面是我得到的运行时异常。 下面是代码文件。 波约 服务 } 服务激活器 } 存储库 请帮助我,我正在试图找到异常发生的原因,但无法解决。提前谢谢。 集成文件。

  • 可以在运行时向spring integration dsl注册MessageSources吗? 在我的例子中,我想创建多个FileReadingMessageSources(基于UI的输入),然后将有效负载发送到特定的通道/jms路由(从元数据或用户输入读取) 另一个问题是,是否可以动态注册IntegrationFlows?

  • 问题内容: 入站和出站通道适配器之间的根本区别是什么? 任何示例都将非常有帮助。 我已经查看过Spring文档,这种“方向性”的区别对我来说还不清楚。我支持配置了outbound-channel-adapter的应用程序,但是我发现使用 出站 标签可以直观地了解行为计数器。该适配器获取一个外部文件,然后 将其 引入应用程序中, 在 该应用程序中我们解析文件并保留数据。 这类似于这个问题,但是我想更