所以我想使用Flux来运行每个Mono任务,并等待每个任务2秒钟。这是我试过的
val test = { x: Int ->
Mono.just(x)
.log()
.doOnNext { println("Number: $it") }
}
Flux.fromIterable(listOf(1, 2, 3))
.flatMap { number ->
test(number)
.delayElement(Duration.ofSeconds(2))
}
.collectList()
//.subscribeOn(Schedulers.single()) <- just try to run on the same thread didn't work either
.block()
结果如下
01:58:21.398 [Test worker] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
01:58:21.470 [Test worker] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
01:58:21.472 [Test worker] INFO reactor.Mono.Just.1 - | request(unbounded)
01:58:21.473 [Test worker] INFO reactor.Mono.Just.1 - | onNext(1)
Number: 1
01:58:21.477 [Test worker] INFO reactor.Mono.Just.1 - | onComplete()
01:58:21.477 [Test worker] INFO reactor.Mono.Just.2 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
01:58:21.477 [Test worker] INFO reactor.Mono.Just.2 - | request(unbounded)
01:58:21.478 [Test worker] INFO reactor.Mono.Just.2 - | onNext(2)
Number: 2
01:58:21.478 [Test worker] INFO reactor.Mono.Just.2 - | onComplete()
01:58:21.478 [Test worker] INFO reactor.Mono.Just.3 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
01:58:21.479 [Test worker] INFO reactor.Mono.Just.3 - | request(unbounded)
01:58:21.479 [Test worker] INFO reactor.Mono.Just.3 - | onNext(3)
Number: 3
01:58:21.479 [Test worker] INFO reactor.Mono.Just.3 - | onComplete()
正如您在时间戳delayElement上看到的,即使它在同一个线程上运行(测试工作人员),它似乎也不起作用。我在这里做错了什么?
编辑:我真的不明白它是如何工作的,但是我没有在平面图中添加delayElement,而是在它上面添加delayElements,它工作正常。
Flux.fromIterable(listOf(1, 2, 3))
.delayElements(Duration.ofSeconds(2))
.flatMap { number ->
test(number)
}
.collectList()
//.subscribeOn(Schedulers.single()) <- just try to run on the same thread didn't work either
.block()
结果
03:21:29.825 [Test worker] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
03:21:29.926 [Test worker] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
03:21:29.929 [Test worker] INFO reactor.Flux.Array.1 - | request(32)
03:21:29.930 [Test worker] INFO reactor.Flux.Array.1 - | onNext(1)
03:21:29.935 [Test worker] INFO reactor.Flux.Array.1 - | onNext(2)
03:21:29.936 [Test worker] INFO reactor.Flux.Array.1 - | onNext(3)
03:21:29.936 [Test worker] INFO reactor.Flux.Array.1 - | onComplete()
Thread[parallel-1,5,main] What is this 1 31
Thread[parallel-2,5,main] What is this 2 33
Thread[parallel-3,5,main] What is this 3 35
问题是,在第一个例子中,您在flatmap中执行延迟,知道flatmap具有预批和并发256和32,这意味着flatmap中的代码将并行运行,所有项将延迟2秒,但将同时执行(订阅),如果您想将其放入,则需要将concatMap与预批1一起使用
Flux.fromIterable(listOf(1, 2, 3))
.concatmap ( number ->
test(number)
.delayElement(Duration.ofSeconds(2))
,1)
.collectList()
.block()
这将使内部代码等待预览完成对项目的逐个请求
但更清洁的解决方案是在
在Reactor Netty中,当通过向TCP通道发送数据时,任何发布服务器都可以工作。但是,如果不是简单的即时而是使用带有延迟元素的更复杂的flux,那么它就会停止正常工作。例如,如果我们使用这个hello world TCP echo服务器,它将按照预期工作: 但是,如果我们将更改为 那么我们将期望对于每一个接收到的项目,都将产生一个延迟一秒的输出。 但是,服务器的行为方式是,如果它在间隔期间
我尝试在我的机器人上实现一个延迟功能,以一个接一个地显示多条消息。延迟特性显示在我的Flow Bot Builder图表中,但当我在conversation tester和Messenger上的代理Bot中测试时,延迟实际上并没有发生--所有消息都同时显示。 我已经将IDE中的延迟代码添加到default.scr文件中: 我还按照下面的指示将options.apikey代码行添加到index.js
当我创建RoboVM Eclipse项目时,Eclipse给了我一个例外: 同样,当我尝试转到“设置”>“RoboVM”时,我也会得到这样的消息:
根据它的Javadoc,将生成,其中的第一个值是subscribe和第一个next信号之间的经过时间。 以下测试不起作用 它将抛出异常: 我原以为经过的时间至少是1000ms,但结果只有11ms。
我想将我的基于vertx的项目移到https://github.com/sczyh30/vertx-blueprint-microservice.git模板。blueprint项目使用annotations@vertxgen等在编译期间生成代码。
pom.xml版本信息: SpringFox-Swagger2:2.5.0 昂首阔步-核心:1.5.10 springfox-swagger-ui:2.6.1 Springboot:1.5.3 我有一个项目与swagger2和Springboot。 没有@Aspect的项目代码工作得很好。 正确的结果: 但是当我添加以下代码时,swagger-ui没有显示test-api-impl。 swagge