当前位置: 首页 > 知识库问答 >
问题:

第一个元素有时不包括在通量的第二个参数中。先切换?

吴山
2023-03-14

我试着使用电抗器中的SwitchOnFirst算子,这很神奇-除了有时作为双函数的第二个参数传递的transformer似乎不包括第一个元素。基本上,客户端通过RSocket向服务器发送2个项目。代码服务器端如下所示:

val socket = new AbstractRSocket() {

    override def requestChannel(payloads: Publisher[Payload]): Flux[Payload] =
      Flux.from(payloads).log.switchOnFirst((signal, all) => handle(signal.get(), all))

    private def handle(first: Payload, all: Flux[Payload]): Flux[Payload] =
      extractRoute(first) match {
        case Some("test.route") =>
          val source = Source.fromPublisher(all.log()).map(_.getDataUtf8)
          actorSink.runWith(source)
          return Flux.from(actorSource).map(DefaultPayload.create).runWith(Sink.asPublisher(false)))
      }

  }

客户端第一次启动时,服务器接收这两个项目,并按预期将其发布到actorsink。

[2020-02-01 16:17:42,656] [INFO] [reactor.Flux.DoFinallyFuseable.1] [] [reactor-tcp-epoll-2] - | onSubscribe([Fuseable] FluxDoFinally.DoFinallyFuseableSubscriber) {}
[2020-02-01 16:17:42,658] [INFO] [reactor.Flux.DoFinallyFuseable.1] [] [reactor-tcp-epoll-2] - | request(1) {}
[2020-02-01 16:17:42,664] [INFO] [reactor.Flux.DoFinallyFuseable.1] [] [reactor-tcp-epoll-2] - | onNext(io.rsocket.util.ByteBufPayload@53e655e6) {}
[2020-02-01 16:17:42,731] [INFO] [reactor.Flux.SwitchOnFirstInner.2] [] [akka.actor.default-dispatcher-9] - onSubscribe(FluxSwitchOnFirst.SwitchOnFirstInner) {}
[2020-02-01 16:17:42,736] [INFO] [reactor.Flux.SwitchOnFirstInner.2] [] [akka.actor.default-dispatcher-9] - request(16) {}
[2020-02-01 16:17:42,739] [INFO] [reactor.Flux.SwitchOnFirstInner.2] [] [akka.actor.default-dispatcher-9] - onNext(io.rsocket.util.ByteBufPayload@53e655e6) {}
[2020-02-01 16:17:42,741] [INFO] [reactor.Flux.DoFinallyFuseable.1] [] [akka.actor.default-dispatcher-9] - | request(15) {}
[Sink] Received (item1)
[2020-02-01 16:17:42,769] [INFO] [reactor.Flux.DoFinallyFuseable.1] [] [reactor-tcp-epoll-2] - | onNext(io.rsocket.util.ByteBufPayload@50215db3) {}
[2020-02-01 16:17:42,769] [INFO] [reactor.Flux.SwitchOnFirstInner.2] [] [reactor-tcp-epoll-2] - onNext(io.rsocket.util.ByteBufPayload@50215db3) {}
[Sink] Received (item2)
[2020-02-01 16:17:42,770] [INFO] [reactor.Flux.DoFinallyFuseable.1] [] [reactor-tcp-epoll-2] - | onComplete() {}
[2020-02-01 16:17:42,771] [INFO] [reactor.Flux.SwitchOnFirstInner.2] [] [reactor-tcp-epoll-2] - onComplete() {}
[Sink] Completed

但是,如果我停止客户端并再次运行它,则只发布第二项。

[2020-02-01 16:18:13,746] [INFO] [reactor.Flux.DoFinallyFuseable.3] [] [reactor-tcp-epoll-3] - | onSubscribe([Fuseable] FluxDoFinally.DoFinallyFuseableSubscriber) {}
[2020-02-01 16:18:13,746] [INFO] [reactor.Flux.DoFinallyFuseable.3] [] [reactor-tcp-epoll-3] - | request(1) {}
[2020-02-01 16:18:13,747] [INFO] [reactor.Flux.DoFinallyFuseable.3] [] [reactor-tcp-epoll-3] - | onNext(io.rsocket.util.ByteBufPayload@5a2d7823) {}
[2020-02-01 16:18:13,751] [INFO] [reactor.Flux.SwitchOnFirstInner.4] [] [akka.actor.default-dispatcher-6] - onSubscribe(FluxSwitchOnFirst.SwitchOnFirstInner) {}
[2020-02-01 16:18:13,752] [INFO] [reactor.Flux.SwitchOnFirstInner.4] [] [akka.actor.default-dispatcher-6] - request(16) {}
[2020-02-01 16:18:13,752] [INFO] [reactor.Flux.DoFinallyFuseable.3] [] [akka.actor.default-dispatcher-6] - | request(16) {}
[2020-02-01 16:18:13,787] [INFO] [reactor.Flux.DoFinallyFuseable.3] [] [reactor-tcp-epoll-3] - | onNext(io.rsocket.util.ByteBufPayload@1fa7bb46) {}
[2020-02-01 16:18:13,788] [INFO] [reactor.Flux.SwitchOnFirstInner.4] [] [reactor-tcp-epoll-3] - onNext(io.rsocket.util.ByteBufPayload@1fa7bb46) {}
[Sink] Received (item2)
[2020-02-01 16:18:13,790] [INFO] [reactor.Flux.DoFinallyFuseable.3] [] [reactor-tcp-epoll-3] - | onComplete() {}

一个区别是[SwitchOnFirstInner.4]请求(16)触发[DoFinallyFuseable.3]请求(16),而不是onNext使用SwitchOnFirst运算符中已经可用的第一个项目。

我可能做错了什么,但不知道是什么。SwitchOnFirst的javadoc指出,在所有情况下都应返回源自原始Flux的发布者,这里不是这种情况(输入发送到ActorSink,输出来自单独的ActorSource),可能是问题吗?

我是新来的Reactor/Reactor插座,所以如果我错过了一些明显的东西,很抱歉。

共有1个答案

晋鹤轩
2023-03-14

没有从allFlux派生返回Flux可能确实是原因,特别是如果actorSink.run(source)请求allsource,因为SwitchOnFirst运算符应该负责处理请求。

 类似资料:
  • 有没有办法将第一个和第二个元素替换为向量中的所有元组?假设我有这样的东西: 元组的第一个元素现在是1和3,第二个元素是2和4。有容易使2和4成为第一个元素吗?

  • 我有困难的逻辑,我有6个数字在我的arraylist和我想打印每2个元素,然后去下一个元素2。喜欢打印1,2然后3,4然后5,6。

  • 我有一个数组,看起来像:。而将始终保持常数。我希望x每迭代增加100。 我就是这么做的: null null 正如您所看到的,它不是在增加值。它总是只是插入。在第一个名为pies的对象中,我希望和下一个名为cakes的对象为。 我该怎么做?

  • 本文向大家介绍JavaScript数组中的第一个元素和最后一个元素?,包括了JavaScript数组中的第一个元素和最后一个元素?的使用技巧和注意事项,需要的朋友参考一下 数组是一组元素。每个元素都有其自己的 索引值。我们可以使用这些索引访问任何元素。但是,对于最后一个元素,直到知道数组中存在的元素数量,我们才知道索引。在这种情况下,我们必须使用逻辑。让我们简要地讨论这些细节。 访问第一个元素 因

  • 下面是我的代码: 我将再次指出,如果我使用队列而不是优先级队列,那么代码可以工作。如何访问优先级队列的前部?

  • 问题内容: 如何在元素列表中选择某个元素?我有以下几点: 很明显,我有适用于所有类的CSS类,但是我也希望能够选择此类的第一,第二或第三div , 而不管它们在标记中的位置 : 几乎类似于jQuery索引选择,这是我当前正在使用的选择,但是我需要一个无脚本的替代方法。 具体来说,我在寻找伪选择器,而不是添加其他类或使用ID来使事情正常工作。 问题答案: 您可能终于在发布此问题与今天之间意识到了这一