当前位置: 首页 > 知识库问答 >
问题:

Reactor项目:我需要处理器吗?

颜永怡
2023-03-14

我试图在Reactor顶部设计一个管道框架。

在每个阶段(不考虑第一个和最后一个阶段),我们都有转换对象的任务(即字符串到其长度或url到其HTML内容等)。举个例子:

您可以看到中间层有3个任务,每个任务将一个X对象转换为一个Y对象(顺便说一句,它始终是一个完全连接的层)

我的问题/困境:我的第一个想法是,我所需要的是通量。merge(),然后将其连接到每个订阅者。例如:

Flux<X> source = Flux.merge(x1Flux, x2Flux)  
source.subscribe(y1Subscriber)
source.subscribe(y2Subscriber)

另一种选择是放置处理器(TopicProcessor?)这将充当中间件(如pub-sub模式中的中间件)

我不知道哪种解决方案最适合我的问题。逻辑上是一样的,但每种架构的实际含义是什么?

谢谢

共有1个答案

唐向荣
2023-03-14

我在这里的一般方法是使用ConnectableFlux来延迟发布,直到您完成整个管道设置,然后在设置管道后对每个通量调用connect()

你可以使用处理器,但我建议尽可能避免使用。

一般要点(未检查语法)类似于:

ConnectableFlux<String> x1 = Flux.just("x1").publish();
ConnectableFlux<String> x2 = Flux.just("x2").publish();

ConnectableFlux<String> y1 = Flux.<String>from(Flux.merge(x1, x2)).publish();
ConnectableFlux<String> y2 = Flux.<String>from(Flux.merge(x1, x2)).publish();
ConnectableFlux<String> y3 = Flux.<String>from(Flux.merge(x1, x2)).publish();

ConnectableFlux<String> z3 = Flux.<String>from(Flux.merge(y1, y2, y3)).publish();

x1.connect();
x2.connect();
y1.connect();
//...etc.

另请注意,您可能希望使用concat()mergeSequential()而不是合并(),具体取决于您的用例(合并()将急切地订阅发布者,concat()不会,mergeSequential()将按收到的顺序合并,可能会交错值。)

 类似资料:
  • 我是项目Reactor或反应式编程的新手,所以我可能做错了什么。我正在努力构建一个执行以下操作的流程: 给定类实体: 从DB读取实体(

  • 官方文档中没有那么多信息,所以我想我误解了什么,并且错误地使用了函数。但我到底做错了什么? 更新 我发现,如果使用,就可以避免这个特殊的问题。比如: 但真正的问题是,在我的情况下,我需要根据提供的数据返回一个数字(而不是)。我可以在这里创建一个新的,但无论如何,以后我应该。例如,在final中。因此,如果我执行,那么它将失败。 但它无论如何都不起作用,困在: 注意到如果删除步骤,它就可以工作了。在

  • 问题内容: 要编译此代码,我可以: 将我的通话置于try / catch块中,或 已经声明它可以抛出一个。 为什么我必须这样做? (示例代码来自Kathy Sierra的SCJP书 。) 我知道引发的异常是已检查的异常,因此我必须处理它,但是在什么情况下需要引发此异常? 问题答案: 如果以一种可以引发检查异常的方式声明方法(不是的子类),则调用该方法的代码必须在一个块中调用它,否则调用者方法必须声

  • 我使用的是Spring 3.1。War不是使用Maven构建的。这只是一个正常的构建。我的课程路径中有以下罐子。 我有下面的代码, 当我运行方法时,我得到下面的异常 为了克服这个问题,如果我添加,会出现以下异常。 为了克服这个问题,如果我添加,就会出现以下异常。 如何克服我最初的异常-

  • 请忽略这个问题。我有错误的设置,导致gRPC的性能不佳。 是否可以比较GRPC与项目Reactor? 我只是想比较REST和GRPC的性能。我看不出GRPC比Reactor快。事实上,它更糟。 GRPC设置: 此 grpc 服务器使用服务器端流对来自 api 服务器的每个请求响应 1000 个“Hello”。 api 服务器返回

  • 就而言,它们之间的主要区别是什么?从文档中,我了解到是异步操作,而是同步操作。但这对我来说并没有什么意义,B/C单声道是关于并行性的,这一点是不可理解的。有人能用一种更容易理解的方式来重新表述吗? 然后在的文档中声明(https://projectreactor.io/docs/core/release/api/reactor/core/publisher/mono.html#FlatMap-ja