当前位置: 首页 > 知识库问答 >
问题:

Reactor:延迟元件不能与通量一起工作

马亮
2023-03-14

所以我想使用Flux来运行每个Mono任务,并等待每个任务2秒钟。这是我试过的

val test = { x: Int ->
      Mono.just(x)
        .log()
        .doOnNext { println("Number: $it") }
    }


Flux.fromIterable(listOf(1, 2, 3))
    .flatMap { number ->
      test(number)
         .delayElement(Duration.ofSeconds(2))
    }
    .collectList()
    //.subscribeOn(Schedulers.single()) <- just try to run on the same thread didn't work either
   .block()

结果如下

01:58:21.398 [Test worker] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
01:58:21.470 [Test worker] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
01:58:21.472 [Test worker] INFO reactor.Mono.Just.1 - | request(unbounded)
01:58:21.473 [Test worker] INFO reactor.Mono.Just.1 - | onNext(1)
Number: 1
01:58:21.477 [Test worker] INFO reactor.Mono.Just.1 - | onComplete()
01:58:21.477 [Test worker] INFO reactor.Mono.Just.2 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
01:58:21.477 [Test worker] INFO reactor.Mono.Just.2 - | request(unbounded)
01:58:21.478 [Test worker] INFO reactor.Mono.Just.2 - | onNext(2)
Number: 2
01:58:21.478 [Test worker] INFO reactor.Mono.Just.2 - | onComplete()
01:58:21.478 [Test worker] INFO reactor.Mono.Just.3 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
01:58:21.479 [Test worker] INFO reactor.Mono.Just.3 - | request(unbounded)
01:58:21.479 [Test worker] INFO reactor.Mono.Just.3 - | onNext(3)
Number: 3
01:58:21.479 [Test worker] INFO reactor.Mono.Just.3 - | onComplete()

正如您在时间戳delayElement上看到的,即使它在同一个线程上运行(测试工作人员),它似乎也不起作用。我在这里做错了什么?

编辑:我真的不明白它是如何工作的,但是我没有在平面图中添加delayElement,而是在它上面添加delayElements,它工作正常。

Flux.fromIterable(listOf(1, 2, 3))
    .delayElements(Duration.ofSeconds(2))
    .flatMap { number ->
      test(number)
    }
    .collectList()
    //.subscribeOn(Schedulers.single()) <- just try to run on the same thread didn't work either
   .block()

结果

03:21:29.825 [Test worker] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
03:21:29.926 [Test worker] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
03:21:29.929 [Test worker] INFO reactor.Flux.Array.1 - | request(32)
03:21:29.930 [Test worker] INFO reactor.Flux.Array.1 - | onNext(1)
03:21:29.935 [Test worker] INFO reactor.Flux.Array.1 - | onNext(2)
03:21:29.936 [Test worker] INFO reactor.Flux.Array.1 - | onNext(3)
03:21:29.936 [Test worker] INFO reactor.Flux.Array.1 - | onComplete()
Thread[parallel-1,5,main] What is this 1 31
Thread[parallel-2,5,main] What is this 2 33
Thread[parallel-3,5,main] What is this 3 35

共有1个答案

颜君浩
2023-03-14

问题是,在第一个例子中,您在flatmap中执行延迟,知道flatmap具有预批和并发256和32,这意味着flatmap中的代码将并行运行,所有项将延迟2秒,但将同时执行(订阅),如果您想将其放入,则需要将concatMap与预批1一起使用

Flux.fromIterable(listOf(1, 2, 3))
.concatmap ( number ->
  test(number)
     .delayElement(Duration.ofSeconds(2))
,1)
.collectList()
.block()

这将使内部代码等待预览完成对项目的逐个请求

但更清洁的解决方案是在

 类似资料:
  • 在Reactor Netty中,当通过向TCP通道发送数据时,任何发布服务器都可以工作。但是,如果不是简单的即时而是使用带有延迟元素的更复杂的flux,那么它就会停止正常工作。例如,如果我们使用这个hello world TCP echo服务器,它将按照预期工作: 但是,如果我们将更改为 那么我们将期望对于每一个接收到的项目,都将产生一个延迟一秒的输出。 但是,服务器的行为方式是,如果它在间隔期间

  • 我尝试在我的机器人上实现一个延迟功能,以一个接一个地显示多条消息。延迟特性显示在我的Flow Bot Builder图表中,但当我在conversation tester和Messenger上的代理Bot中测试时,延迟实际上并没有发生--所有消息都同时显示。 我已经将IDE中的延迟代码添加到default.scr文件中: 我还按照下面的指示将options.apikey代码行添加到index.js

  • 当我创建RoboVM Eclipse项目时,Eclipse给了我一个例外: 同样,当我尝试转到“设置”>“RoboVM”时,我也会得到这样的消息:

  • 根据它的Javadoc,将生成,其中的第一个值是subscribe和第一个next信号之间的经过时间。 以下测试不起作用 它将抛出异常: 我原以为经过的时间至少是1000ms,但结果只有11ms。

  • 我想将我的基于vertx的项目移到https://github.com/sczyh30/vertx-blueprint-microservice.git模板。blueprint项目使用annotations@vertxgen等在编译期间生成代码。

  • pom.xml版本信息: SpringFox-Swagger2:2.5.0 昂首阔步-核心:1.5.10 springfox-swagger-ui:2.6.1 Springboot:1.5.3 我有一个项目与swagger2和Springboot。 没有@Aspect的项目代码工作得很好。 正确的结果: 但是当我添加以下代码时,swagger-ui没有显示test-api-impl。 swagge