当前位置: 首页 > 知识库问答 >
问题:

flux.zip方法不发射所有元素

慕容聪
2023-03-14

我正在与反应流和出版商(Mono和Flux)合作,并使用zip和zipWith Flux方法将这两个出版商结合起来,如下所示:

Flux<String> flux1 = Flux.just(" {1} ","{2} ","{3} ","{4} " );
Flux<String> flux2 = Flux.just(" |A|"," |B| "," |C| ");
Flux.zip(flux1, flux2,
                    (itemflux1, itemflux2) -> "[ "+itemflux1 + ":"+ itemflux2 + " ] " )
            .subscribe(System.out::print);

以下是输出:

[  {1} : |A| ] [ {2} : |B|  ] [ {3} : |C|  ] 

由于flux1有四个元素,flux2有三个元素,flux1中的第四个元素丢失。当我试图打印通量的日志时,没有关于第四元素发生了什么的信息。

以下是打印日志的语句:

Flux.zip(flux1, flux2,
                (itemflux1, itemflux2) -> "[ "+itemflux1 + ":"+ itemflux2 + " ] " ).log()
        .subscribe(System.out::print);

下面是使用log方法的控制台日志:

[info] onSubscribe(FluxZip.ZipCoordinator)
[info] request(unbounded)
[info] onNext([  {1} : |A| ] )
[  {1} : |A| ] [info] onNext([ {2} : |B|  ] )
[ {2} : |B|  ] [info] onNext([ {3} : |C|  ] )
[ {3} : |C|  ] [info] onComplete()

从zip方法的文档中,我得到了

操作员将继续这样做,直到任何源完成。错误将立即转发。这种“分步合并”处理在分散-聚集场景中特别有用。

但在我的例子中,它没有记录任何错误,也没有记录任何关于丢失元素的消息。

我怎样才能得到丢失元素的信息?

请建议。

共有3个答案

壤驷棋
2023-03-14

解释留档所说的
错误将立即转发。--

没有办法得到丢失的元素。因为当其中一个流结束时,不会进一步读取该流。希望这是清楚的。如果您真的想获取流的最后一个元素,请尝试其他操作符。

通骁
2023-03-14

这是该运算符的预期行为。使用Flux。zip,提供的流量之一可能是无限的;一个常见的例子是用流量压缩流量。间隔(持续时间)实例(无限)。

如果你被困在这种情况下,这可能意味着你需要使用不同的操作员。

暴绪
2023-03-14

zip/zipWith将输出最短流量中元素的数量。当最小的通量终止时,它会取消较长的通量,如果将log()放在源通量上,而不是压缩的通量上,则应该可以看到。

这可以通过这个代码段来演示(它被调优以显示1×1请求并作为单元测试运行,因此隐藏()/zipBy(...,1)block last()):

@Test
public void test() {
    Flux<Integer> flux1 = Flux.range(1, 4).hide().log("\tFLUX 1");
    Flux<Integer> flux2 = Flux.range(10, 2).hide().log("\tFlux 2");

    flux1.zipWith(flux2, 1)
        .log("zipped")
        .blockLast();
}

哪些输出:

11:57:21.072 [main] INFO  zipped - onSubscribe(FluxZip.ZipCoordinator)
11:57:21.077 [main] INFO  zipped - request(unbounded)
11:57:21.079 [main] INFO    FLUX 1 - onSubscribe(FluxHide.HideSubscriber)
11:57:21.079 [main] INFO    FLUX 1 - request(1)
11:57:21.079 [main] INFO    FLUX 1 - onNext(1)
11:57:21.079 [main] INFO    Flux 2 - onSubscribe(FluxHide.HideSubscriber)
11:57:21.080 [main] INFO    Flux 2 - request(1)
11:57:21.080 [main] INFO    Flux 2 - onNext(10)
11:57:21.080 [main] INFO  zipped - onNext([1,10])
11:57:21.080 [main] INFO    FLUX 1 - request(1)
11:57:21.080 [main] INFO    FLUX 1 - onNext(2)
11:57:21.080 [main] INFO    Flux 2 - request(1)
11:57:21.080 [main] INFO    Flux 2 - onNext(11)
11:57:21.080 [main] INFO  zipped - onNext([2,11])
11:57:21.080 [main] INFO    FLUX 1 - request(1)
11:57:21.080 [main] INFO    FLUX 1 - onNext(3)
11:57:21.080 [main] INFO    Flux 2 - request(1)
11:57:21.080 [main] INFO    Flux 2 - onComplete()
11:57:21.081 [main] INFO    FLUX 1 - cancel() <----- HERE
11:57:21.081 [main] INFO    Flux 2 - cancel()
11:57:21.081 [main] INFO  zipped - onComplete()

这是“直到任何源完成”部分。

 类似资料:
  • 本文向大家介绍jQuery使用empty()方法删除元素及其所有子元素的方法,包括了jQuery使用empty()方法删除元素及其所有子元素的方法的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了jQuery使用empty()方法删除元素及其所有子元素的方法。分享给大家供大家参考。具体实现方法如下: 希望本文所述对大家的jQuery程序设计有所帮助。

  • 我有两种将实体映射到域的方法。 当我试图定义实体列表到域的映射方法时,我发现了用于映射集合元素的模糊映射方法。 有没有一种方法可以定义用于映射对象集合的方法

  • 本文向大家介绍C#不重复输出一个数组中所有元素的方法,包括了C#不重复输出一个数组中所有元素的方法的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了C#不重复输出一个数组中所有元素的方法。分享给大家供大家参考。具体如下: 1.算法描述 0)输入合法性校验 1)建立临时数组:与原数组元素一样。该步骤的目的是防止传入的原数组被破坏 2)对临时数组进行排序 3)统计临时数组共有多少个不同的数字。该

  • 我使用三台机器和本指南在aws集群上设置了一个测试环境。 我在本地模式下测试了我的代码,并使用wirbelsturm创建了一个本地vagrant集群,这两种方法都能产生预期的结果。 当我现在向Web服务器提交代码时,我的喷口和所有的螺栓都是静默的。我的嘴读的是csv,我把它抄送给了灵气和我的主管。storm UI显示我的拓扑为活动的,并显示所有螺栓和我的喷口,但计数器不可见。主管没有已使用得员工.

  • 本文向大家介绍JavaScript获取表单内所有元素值的方法,包括了JavaScript获取表单内所有元素值的方法的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了JavaScript获取表单内所有元素值的方法。分享给大家供大家参考。具体如下: 下面的JS代码可以遍历指定表单中的所有元素,并输出元素的值 希望本文所述对大家的javascript程序设计有所帮助。

  • 问题内容: 有人告诉我 编写一个函数square(a),该函数接受一个数字数组a并返回一个包含每个平方值的数组。 起初,我有 但是,由于我正在打印,而且没有像被问到的那样返回,因此这不起作用。所以我尝试了 但这仅平方我数组的最后一个数字。我如何才能使整个列表平方? 问题答案: 您可以使用列表理解: 或者您可以: 或者,您可以使用发电机。它不会返回列表,但是您仍然可以迭代它,并且由于不必分配整个新列