当前位置: 首页 > 知识库问答 >
问题:

反应Spring集成相当于项目Reactor。doOnNext()

龚鸿羽
2023-03-14

作为项目Reactor的用户,也想使用Spring集成,我想执行以下操作,这将以这样的方式工作:

flux.buffer(duration)
    .doOnNext(bulkWriteToCockroach())
    .doOnNext(bulkWriteToPulsar());

一开始,我认为解决方案是执行以下错误代码

IntegrationFlow.from(myflow)
    .aggregate(myTimeBasedAggregation())
    .handle(bulkWriteToCockroach())
    .handle(bulkWriteToPulsar());

当然,由于outputChannel问题,它无法工作。我想知道如何一个接一个地执行操作(例如,在CockroachDB写入完成之前不要继续脉冲星写入,如果第一次操作失败,请停止这些消息的流)。

我正在考虑使用Spring集成事务支持,但我担心它在Retor中的使用。

我还看到有一种叫做网关()的东西,但是我无法找到一个基于JavaDSL的使用它的可用示例。

共有1个答案

琴英华
2023-03-14

查看其JavaDocs:

/**
 * Specify whether failures for one or more of the handlers should be
 * ignored. By default this is false meaning that an Exception
 * will be thrown whenever a handler fails. To override this and suppress
 * Exceptions, set the value to true.
 * @param ignoreFailures true if failures should be ignored.
 */
public void setIgnoreFailures(boolean ignoreFailures) {

使用JavaDSL,您应该查看:

/**
 * The {@link org.springframework.integration.channel.PublishSubscribeChannel} {@link #channel}
 * method specific implementation to allow the use of the 'subflow' subscriber capability.
 * @param publishSubscribeChannelConfigurer the {@link Consumer} to specify
 * {@link PublishSubscribeSpec} options including 'subflow' definition.
 * @return the current {@link BaseIntegrationFlowDefinition}.
 */
public B publishSubscribeChannel(Consumer<PublishSubscribeSpec> publishSubscribeChannelConfigurer) {

其示例可能如下所示:

                .publishSubscribeChannel(s -> s
                        .subscribe(f -> f
                                .handle(bulkWriteToCockroach()))
                        .subscribe(f -> f
                                .handle(bulkWriteToPulsar())));

请参阅文档中的更多内容:https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl子流

使现代化

网关()在docs:https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-gateway中解释

 类似资料:
  • Spring integration提供了非反应的入站/出站WebSocket适配器,简单地说,它通过内部容器将会话与id相关联,您对消息进行一些处理,在出站时,它检查消息头是否有会话id,并通过该会话发送。 现在,Spring通过org.springframework.web.reactive.socket.WebSocketSession和其他类提供了反应性WebSocket支持,我想知道在反

  • 下面是我的MongoDB查询 下面是我的SpringData mongo API 我想知道如何使用Spring API在项目聚合中使用$eq。我试着把比赛放到项目之外,如下所示。但查询并没有获取任何记录。因此,我使用$eq在项目中进行比较,并将结果分配到一个新的项目属性中,并在项目管道之外进行检查。 {$匹配:{$organization.roles.orgRoleId","$userOrgMap

  • 我创建了一个简单的Kafka使用者,它返回一个对象流(接收到的消息),我试图使用测试它。 在我的测试中,我做了类似的事情: 断言工作正常(如果将值从更改为其他值,则测试失败)。但是,如果断言通过,测试永远不会退出。 我还尝试使用方法,如下所示: 在本例中,我得到以下错误: 你知道我做错了什么吗?

  • 在我的应用程序中,我必须从主应用程序线程异步处理多个作业,并收集每个作业的结果。我有一个简单的Java解决方案,它使用ExecutorService和收集作业结果的ExecutorCompletionService来实现这一点。 现在我想把我的代码转换成Spring解决方案。这些文档向我展示了ExecutorService和@Async注释的使用方式,但我不确定如何以及是否可以收集多个作业的结果。

  • 我正在尝试将SpringCloudSleuth与使用jetty服务器的现有spring应用程序集成。 我补充说 和 启动应用程序时,我应该如何传递记录器属性、日志模式?现在,它不会读取/resources文件夹下的属性文件,也不会生成traceId或spanId。 大多数示例都使用了Spring Boot。需要帮助来弄清楚如何将此与带有码头服务器的Spring应用程序集成。

  • 本文向大家介绍Spring Boot集成Swagger2项目实战,包括了Spring Boot集成Swagger2项目实战的使用技巧和注意事项,需要的朋友参考一下 一、Swagger简介   上一篇文章中我们介绍了Spring Boot对Restful的支持,这篇文章我们继续讨论这个话题,不过,我们这里不再讨论Restful API如何实现,而是讨论Restful API文档的维护问题。   在日