当前位置: 首页 > 知识库问答 >
问题:

项目Reactor-如何通过窗口处理结果

林子石
2023-03-14
Flux.range(0, 100)
    .log("before window")
    .window(10)
    .map(Flux::toList)
    .log("after window")
    .map((w) -> {
        System.out.println(w.subscribe().get()));
        return 1;
    }) 
    .reduce(0, (a, b) -> a + b)
    .doOnSuccess(System.out::println)
    .subscribe();
[           main] before window : onNext(0)
[           main] after window  : onNext({ operator : "BufferAll" })

官方文档中没有那么多信息,所以我想我误解了什么,并且错误地使用了窗口函数。但我到底做错了什么?

更新

我发现,如果使用.doonsuccess,就可以避免这个特殊的问题。比如:

.map((w) -> {
    w.doOnSuccess((w2) -> System.out.println(w2))).subscribe();
    return 1;
}

但真正的问题是,在我的情况下,我需要根据提供的数据返回一个数字(而不是1)。我可以在这里创建一个新的,但无论如何,以后我应该.get。例如,在final.reduce中。因此,如果我执行.reduce(0,(a,b)->a+b.get()),那么它将失败。

.window(10)
.log("after window")
.map((w) -> {
    //basically i'm reducing Flux to a Mono<List> and return number of a [good] elements in it
    return w.reduce(...).map(ids -> 100).subscribe();
})
.reduce(0, (a, b) -> a + b.get()) 

但它无论如何都不起作用,困在.reduce:

注意到如果删除.reduce步骤,它就可以工作了。在这种情况下,.window提供的flux的处理在main flow之后执行。我对此没有任何控制权,甚至无法得到最终结果。这没有任何意义。

共有1个答案

杨志强
2023-03-14

这个问题是因为我需要在进一步使用之前减少窗口。

比如:

window(...).flatMap( (window) -> window.reduce(...))

我是在映射器的单声道中这样做的,但它在那里阻止了执行流,所以它不是一个正确的位置。它必须在窗口之后,下次使用之前。

Flux.range(0, 100)
    .window(10)
    .flatMap(window -> {
         return window.reduce(new ArrayList<>(), (a, b) -> {
             a.add(b);
             return a;
         });
    })
    .map((list) -> list.size())
    .reduce(0, (a, b) -> a + b)
    .doOnSuccess(System.out::println)
    .subscribe();
 类似资料:
  • 我是项目Reactor或反应式编程的新手,所以我可能做错了什么。我正在努力构建一个执行以下操作的流程: 给定类实体: 从DB读取实体(

  • 问题内容: 我正在试验Firefox的WebDriver,请问是否可以处理“下载”窗口(接受或拒绝传入的下载请求)? 例如,一段简单的代码: 我已经对此进行了一些尝试,但是还没有找到解决方案。我真的很感谢任何提示。 非常感谢,-V 问题答案: 一种解决方案是更改WebDriver的Firefox配置文件,以将某些MIME类型自动下载到给定目录。 我不确定如何(或是否)在Python中公开此信息,但

  • 我试图在Reactor顶部设计一个管道框架。 在每个阶段(不考虑第一个和最后一个阶段),我们都有转换对象的任务(即字符串到其长度或url到其HTML内容等)。举个例子: 您可以看到中间层有3个任务,每个任务将一个X对象转换为一个Y对象(顺便说一句,它始终是一个完全连接的层) 我的问题/困境:我的第一个想法是,我所需要的是通量。merge(),然后将其连接到每个订阅者。例如: 另一种选择是放置处理器

  • 问题内容: 我正在尝试从第二个窗口切换到第三个窗口。但是无法处理第三个窗口。有人可以帮助我解决此问题。我已经使用比较窗口标题的逻辑,但是它不起作用。代码======================= 错误堆栈跟踪: 问题答案: 这是切换到 并单击 按钮的完整代码块: 我的IDE控制台上的输出是:

  • 我有一个通量和单值,我不知道如何组合它们,以便在通量的每一项中都有单值。 我正在尝试这种方法,但它不起作用:

  • 我有这样的场景,当点击一个按钮时,它打开了一个基于PDF文件的窗口: 我使用的是Gecko驱动程序版本-21.0Firefox版本-61.0.1 Selenium独立服务器-3.13 我无法切换到基于PDF文件的窗口获取错误: 我想用最新的壁虎驱动程序-21.0来处理它