当前位置: 首页 > 知识库问答 >
问题:

使用REDUCT on flatMap时,反应器流量用户流停止

牟恺
2023-03-14

我想改变我的代码为单一订户。现在我有了

auctionFlux.window(Duration.ofSeconds(120), Duration.ofSeconds(120)).subscribe(
        s -> s.groupBy(Auction::getItem).subscribe( longAuctionGroupedFlux -> longAuctionGroupedFlux.reduce(new ItemDumpStats(), this::calculateStats )
));

这段代码工作正常,reduce方法非常简单。我试着为单个订户更改代码

    auctionFlux.window(Duration.ofSeconds(120), Duration.ofSeconds(120))
        .flatMap(window -> window.groupBy(Auction::getItem))
        .flatMap(longAuctionGroupedFlux -> longAuctionGroupedFlux.reduce(new ItemDumpStats(), this::calculateStats))
        .subscribe(itemDumpStatsMono -> log.info(itemDumpStatsMono.toString()));

这是我的代码,这个代码不起作用。没有错误,没有结果。调试后,我发现代码卡在第二个平面地图上,当我减少流。我认为问题是平面地图合并,卡在单声道解决方案上。有些人现在如何解决这个问题,只使用单个订户?

如何复制,您可以使用另一个类或创建一个类。小尺寸的正在工作,而大尺寸的正在消亡

List<Auction> auctionList = new ArrayList<>();
for (int i = 0;i<100000;i++){
    Auction a = new Auction((long) i, "test");
    a.setItem((long) (i%50));
    auctionList.add(a);
}

Flux.fromIterable(auctionList).groupBy(Auction::getId).flatMap(longAuctionGroupedFlux ->
        longAuctionGroupedFlux.reduce(new ItemDumpStats(), (itemDumpStats, auction) -> itemDumpStats)).collectList().subscribe(itemDumpStats -> System.out.println(itemDumpStats.toString()));

这种方法是即时的结果,但我使用3个订阅者

Flux.fromIterable(auctionList)
        .groupBy(Auction::getId)
        .subscribe(
                auctionIdAuctionGroupedFlux -> auctionIdAuctionGroupedFlux.reduce(new ItemDumpStats(), (itemDumpStats, auction) -> itemDumpStats).subscribe(itemDumpStats -> System.out.println(itemDumpStats.toString()
                )
        ));

共有2个答案

尉迟明辉
2023-03-14

添加parallel修复了这个问题,但我想知道为什么要大大降低flatMap的速度。

韩淇
2023-03-14

我认为您描述的行为与groupByflatMap链接的交互有关。检查groupBy文档。它指出:

groupBy需要在下游排空和消耗这些组才能正常工作。值得注意的是,当标准生成大量组时,如果组未在下游适当消费(例如,由于具有设置得太低的maxConcurrency参数的flatMap),则可能导致挂起。

默认情况下,maxConcurrency(flatMap)设置为256(我检查了3.2.2的源代码)。因此,选择超过256个组可能会导致执行挂起(特别是当所有执行都发生在同一线程上时)。

下面的代码有助于理解当您将运算符group pBy和phaMap链接时会发生什么:

@Test
public void groupAndFlatmapTest() {
    val groupCount = 257;
    val groupSize = 513;
    val list = rangeClosed(1, groupSize * groupCount).boxed().collect(Collectors.toList());
    val source = Flux.fromIterable(list)
            .groupBy(i -> i % groupCount)
            .flatMap(Flux::collectList);
    StepVerifier.create(source).expectNextCount(groupCount).expectComplete().verify();
}

此代码的执行挂起。将groupCount更改为256或更少会使测试通过(对于groupSize的每个值)。

因此,关于您最初的问题,很有可能您正在使用键值选择器Auction::getItem创建大量组。

 类似资料:
  • 我用的是Spring助焊剂。我需要从不同的来源组装一个物体。如何确保两个流都返回了所需的数据? 比如:

  • 我本可以使用而不是第二个,因为它不映射任何东西,但我不确定这是否是peek方法的可接受用法。 我也可以在第二个中使用一个有状态映射器来只运行一次,而不是用索引压缩,我想这是可以接受的,因为我已经使用了一个有状态谓词...

  • 1.通过该接口可以获取某用户某天的小时维度的流量信息。地址为: http://spark.bokecc.com/api/traffic/user/hourly 需要传递以下参数: 参数 说明 userid 用户id,不可为空 date 查询日期,格式为yyyy-MM-dd,不可为空 返回数据traffics包含如下字段: 字段名 说明 traffic 流量信息 traffic包含如下字段: 字段名

  • 我有一个drl文件,它在两个规则流组中包含规则:“第一个规则流组”和“第二个规则流组”。这些组的激活取决于“规则A”和“规则B”。是否有任何方法可以停用规则B,以便在规则A条件匹配时触发,从而仅将焦点设置为“第一个规则流组”?

  • 我们正在尝试使用具有net core后端的流,但出现反序列化错误 协议是版本3.14.0 grpc-web-gen是1.2.1 生成客户端的命令是:协议-我=。/原型。/原型/*. proto-js_out=import_style=通用js,二进制:。/dist--grpc-web_out=import_style=通用js dts,模式=grpcwebtext:。/dist 我们成功连接到en