我想过滤所有值满足特定条件的窗口,然后打印出过滤后的通量:
Flux.just(4, 2, 6, 4, 5, 6, 7, 8, 9)
.window(3)
.filterWhen(window -> window.all(n -> n % 2 == 0))
.flatMap(window -> window)
.subscribe(System.out::println);
然而,结果不是4、2、6,而是
java.lang.IllegalStateException: UnicastProcessor allows only a single Subscriber.
如果我尝试相同但不使用过滤器,当它正常工作时:
Flux.just(4, 2, 6, 4, 5, 6, 7, 8, 9)
.window(3)
.flatMap(window -> window)
.subscribe(System.out::println);
如果我在没有平面图的情况下尝试,也不会引发异常:
Flux.just(4, 2, 6, 4, 5, 6, 7, 8, 9)
.window(3)
.filterWhen(window -> window.all(n -> n % 2 == 0))
.subscribe(System.out::println);
然而,这两种方法显然都没有产生预期的结果。但是,当和平面图时,在组合滤波器时似乎存在问题!
我的第一个例子有什么不对?另一个订阅者在哪里?
如何获取值?
这在目前是不可能的,因为一个窗口只能订阅(“消费”)一次。当将订阅窗口时,flatMap
和filter都将订阅该窗口,因此会出现错误。
相反,您可以使用
缓冲区(3)
而不是窗口(3)
,过滤缓冲区,然后从通过过滤器的缓冲区中发出值:
Flux.just(4, 2, 6, 4, 5, 6, 7, 8, 9)
.buffer(3)
.filter(list -> list.stream().allMatch(n -> n % 2 == 0))
.flatMapIterable(Function.identity())
.subscribe(System.out::println);
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/doc-files/marbles/flatMapForFlux.svg 这个问题是关于项目反应器中Flux发布者中的平面图函数。根据我们对平面地图的理解,我们必须返回一个发布者,平面地图将在内部订阅该发布者。我们想知道的是线程如何处理这种情况。是要多线程还
问题内容: 我在Android应用程序开发中从Java稍微转移到Kotlin,但是在某些情况下,我不想用Kotlin进行编码,而是希望这些特殊情况用Java编写: 省去Kotlin多余的使用量 我知道现在正是Java总是以相反的方式触发 还提供了表达式和许多其他功能。 但仍然,我的某些代码无法用Kotlin编写,例如成员或字段。 Kotlin注释实际上可以代替那些注释。但是喜欢Java的某些编码功
我正尝试在类的方法中使用(以支持拖放)。我想显示我拖动的文件的导入进度。然而,它并不起作用。我不明白问题是什么,或者是什么引起的。 例外情况:
问题内容: 我已经使用Python asyncio和aiohttp成功构建了一个RESTful微服务,该服务可侦听POST事件以收集来自各种供料器的实时事件。 然后,它构建一个内存结构,以将事件的最后24小时缓存在嵌套的defaultdict / deque结构中。 现在,我想定期检查该结构到磁盘的位置,最好使用pickle。 由于内存结构可以大于100MB,因此我希望避免在检查点结构所需的时间上
如果ExternalService.Get抛出异常而不是返回mono.error,则无法使其工作。总是建议使用try catch转换为mono/flow或者有没有更好的方法来验证这样的抛出异常?
所以我有两个关于java的一般问题。第一个问题是,什么时候在方法体中使用try/catch,而不是在声明方法时使用throws异常?这是我的意思的一个小例子。这是: 对抗 然后我的第二个问题是什么时候知道捕获或抛出什么类型的异常?我指的是诸如IOException或EOFException等异常... 如果有一个好的链接,有人可以发给我教这一切(这可能比我想象的更复杂),我会像你回答它一样感激。谢