当前位置: 首页 > 知识库问答 >
问题:

通过Reactor功能管道下游的物体

淳于泓
2023-03-14

我是一个新的反应器和反应编程,我正试图解决下面的情况。

我从Kafka主题接收到一个对象流,对于flux中的每个记录,我需要调用2个服务并验证对象。

public void consume(Flux<Data> flux)
{
flux.map(data->callRESTService1(data)).map(...<I need the data once again here to call rest service 2>
}

现在我正在使用下面的样式来实现这一点,但是有没有更好的/正确的方法来做到这一点呢?

public void consume(Flux<Data> flux)
{
   flux.subscribe(data->handleData(data));
}


 public void handleData(data)
    {
 Flux.concat(callRestService1(data),callRestService2(data)).reduce(data,reduce());
    }

此外,如果其中一个服务关闭,我需要在侦听器上传播错误,以便消息不被确认,但在另一种情况下,如果验证失败,则需要将消息发布到另一个主题。

共有1个答案

史俊德
2023-03-14

这两个路径都需要原始元素,而且每个路径都有不同的处理错误的方法,这一事实很好地表明您可能需要flatmap:

Flux<Data> source; //= ...
return source.flatMap(value -> {
    Mono<IgnoreMe1> service1 = callRestService1(value);
    Mono<IgnoreMe2> service2 = callRestService2(value)
        .onErrorResume(e -> postErrorToTopic(e, value)); //might need some type massaging, eg. if the post to topic method returns a `Mono<Void>`

    //wait for the two to complete, propagate their errors if any, else return original value
    return Mono.when(service1, service2)
       .thenReturn(value);
}
 类似资料:
  • 我们使用通道来同步协程之间的执行。 下面的例子是通过获取同步通道数据来阻塞程序执行的方法来等待另一个协程运行结束的。 也就是说main函数所在的协程在运行到<-done语句的时候将一直等待worker函数所在的协程执行完成,向通道写入数据才会(从通道获得数据)继续执行。 package main import "fmt" import "time" // 这个worker函数将以协程的方式运行 /

  • 问题内容: 我在bash脚本中具有简单的功能,我想将stdout作为输入传递给它。 我想以这种方式使用它。 当然,我使用了冗余函数echo和printf来简化问题,但是您明白了。现在,我收到一个“未找到”错误,我认为这意味着我的参数定界是错误的(“ $ 1”部分)。有什么建议么? 最初,jc_hms函数的用法如下: 但我想将结果存储在变量中,以便在将其发送到串行端口之前先进行进一步处理。 编辑:所

  • 本文向大家介绍php实现购物车功能(下),包括了php实现购物车功能(下)的使用技巧和注意事项,需要的朋友参考一下 接着上篇继续学习: 《php实现购物车的功能(上)》 7、实现一个管理界面 登录界面 由以下代码实现: 7.1 admin.php 7.2 user_auth_fns.php文件中的函数login() 7.3 user_auth_fns.php文件中的函数check_admin_us

  • 我刚刚将ProjectReactor.io从旧反应堆[Core:3.0.1.发行版,Netty:0.5.2.发行版]升级到新反应堆[Core:3.0.4.发行版,Netty:0.6.0.发行版]。 我打开了一个TcpClient连接并希望稍后关闭它。 在我使用的旧版本中 断开我的客户端与服务器的连接。 新版本中是否有对等调用?我找不到一个! 在使用创建TcpClient时,我对获得的和尝试了以下操

  • 在本章中,我们将了解颜色通道功能在LESS中的重要性。 LESS支持很少的内置函数,可以设置关于通道的值。 通道根据颜色定义设置值。 HSL颜色具有色调,饱和度和亮度通道,RGB颜色具有红色,绿色和蓝色通道。 下表列出了所有颜色通道功能 - Sr.No. 功能说明 例 1 hue 在HSL颜色空间中,从颜色对象中提取色调通道。 background: hue(hsl(75, 90%, 30%));

  • 问题内容: 我正在使用具有集成交付管道功能(https://jenkins.io/solutions/pipeline/)的Jenkins v2.1 来编排两个现有构建(构建和部署)。 在我的参数化构建中,我有3个用户参数设置,也需要在管道中选择该参数。 管道脚本如下: 除以外,此方法均正常运行。当我构建管道时,会引发以下错误: 如何解决此类型转换错误?甚至更好的是,有没有一种麻烦的方式可以将所有