当前位置: 首页 > 知识库问答 >
问题:

filterWhen与flatMap结合在一起在Reactor中引发异常

印劲
2023-03-14

我想过滤所有值满足特定条件的窗口,然后打印出过滤后的通量:

Flux.just(4, 2, 6, 4, 5, 6, 7, 8, 9)
        .window(3)
        .filterWhen(window -> window.all(n -> n % 2 == 0))
        .flatMap(window -> window)
        .subscribe(System.out::println);

然而,结果不是4、2、6,而是

java.lang.IllegalStateException: UnicastProcessor allows only a single Subscriber.

如果我尝试相同但不使用过滤器,当它正常工作时:

Flux.just(4, 2, 6, 4, 5, 6, 7, 8, 9)
        .window(3)
        .flatMap(window -> window)
        .subscribe(System.out::println);

如果我在没有平面图的情况下尝试,也不会引发异常:

Flux.just(4, 2, 6, 4, 5, 6, 7, 8, 9)
        .window(3)
        .filterWhen(window -> window.all(n -> n % 2 == 0))
        .subscribe(System.out::println);

然而,这两种方法显然都没有产生预期的结果。但是,当和平面图时,在组合滤波器时似乎存在问题!

我的第一个例子有什么不对?另一个订阅者在哪里?

如何获取值?

共有1个答案

祝灼光
2023-03-14

这在目前是不可能的,因为一个窗口只能订阅(“消费”)一次。当将订阅窗口时,flatMapfilter都将订阅该窗口,因此会出现错误。

相反,您可以使用缓冲区(3)而不是窗口(3),过滤缓冲区,然后从通过过滤器的缓冲区中发出值:

Flux.just(4, 2, 6, 4, 5, 6, 7, 8, 9)
    .buffer(3)
    .filter(list -> list.stream().allMatch(n -> n % 2 == 0))
    .flatMapIterable(Function.identity())
    .subscribe(System.out::println);

 类似资料:
  • https://projectreactor.io/docs/core/release/api/reactor/core/publisher/doc-files/marbles/flatMapForFlux.svg 这个问题是关于项目反应器中Flux发布者中的平面图函数。根据我们对平面地图的理解,我们必须返回一个发布者,平面地图将在内部订阅该发布者。我们想知道的是线程如何处理这种情况。是要多线程还

  • 问题内容: 我在Android应用程序开发中从Java稍微转移到Kotlin,但是在某些情况下,我不想用Kotlin进行编码,而是希望这些特殊情况用Java编写: 省去Kotlin多余的使用量 我知道现在正是Java总是以相反的方式触发 还提供了表达式和许多其他功能。 但仍然,我的某些代码无法用Kotlin编写,例如成员或字段。 Kotlin注释实际上可以代替那些注释。但是喜欢Java的某些编码功

  • 我正尝试在类的方法中使用(以支持拖放)。我想显示我拖动的文件的导入进度。然而,它并不起作用。我不明白问题是什么,或者是什么引起的。 例外情况:

  • 问题内容: 我已经使用Python asyncio和aiohttp成功构建了一个RESTful微服务,该服务可侦听POST事件以收集来自各种供料器的实时事件。 然后,它构建一个内存结构,以将事件的最后24小时缓存在嵌套的defaultdict / deque结构中。 现在,我想定期检查该结构到磁盘的位置,最好使用pickle。 由于内存结构可以大于100MB,因此我希望避免在检查点结构所需的时间上

  • 如果ExternalService.Get抛出异常而不是返回mono.error,则无法使其工作。总是建议使用try catch转换为mono/flow或者有没有更好的方法来验证这样的抛出异常?

  • 所以我有两个关于java的一般问题。第一个问题是,什么时候在方法体中使用try/catch,而不是在声明方法时使用throws异常?这是我的意思的一个小例子。这是: 对抗 然后我的第二个问题是什么时候知道捕获或抛出什么类型的异常?我指的是诸如IOException或EOFException等异常... 如果有一个好的链接,有人可以发给我教这一切(这可能比我想象的更复杂),我会像你回答它一样感激。谢