我想改变我的代码为单一订户。现在我有了
auctionFlux.window(Duration.ofSeconds(120), Duration.ofSeconds(120)).subscribe(
s -> s.groupBy(Auction::getItem).subscribe( longAuctionGroupedFlux -> longAuctionGroupedFlux.reduce(new ItemDumpStats(), this::calculateStats )
));
这段代码工作正常,reduce方法非常简单。我试着为单个订户更改代码
auctionFlux.window(Duration.ofSeconds(120), Duration.ofSeconds(120))
.flatMap(window -> window.groupBy(Auction::getItem))
.flatMap(longAuctionGroupedFlux -> longAuctionGroupedFlux.reduce(new ItemDumpStats(), this::calculateStats))
.subscribe(itemDumpStatsMono -> log.info(itemDumpStatsMono.toString()));
这是我的代码,这个代码不起作用。没有错误,没有结果。调试后,我发现代码卡在第二个平面地图上,当我减少流。我认为问题是平面地图合并,卡在单声道解决方案上。有些人现在如何解决这个问题,只使用单个订户?
如何复制,您可以使用另一个类或创建一个类。小尺寸的正在工作,而大尺寸的正在消亡
List<Auction> auctionList = new ArrayList<>();
for (int i = 0;i<100000;i++){
Auction a = new Auction((long) i, "test");
a.setItem((long) (i%50));
auctionList.add(a);
}
Flux.fromIterable(auctionList).groupBy(Auction::getId).flatMap(longAuctionGroupedFlux ->
longAuctionGroupedFlux.reduce(new ItemDumpStats(), (itemDumpStats, auction) -> itemDumpStats)).collectList().subscribe(itemDumpStats -> System.out.println(itemDumpStats.toString()));
这种方法是即时的结果,但我使用3个订阅者
Flux.fromIterable(auctionList)
.groupBy(Auction::getId)
.subscribe(
auctionIdAuctionGroupedFlux -> auctionIdAuctionGroupedFlux.reduce(new ItemDumpStats(), (itemDumpStats, auction) -> itemDumpStats).subscribe(itemDumpStats -> System.out.println(itemDumpStats.toString()
)
));
添加parallel
修复了这个问题,但我想知道为什么要大大降低flatMap的速度。
我认为您描述的行为与groupBy
与flatMap
链接的交互有关。检查groupBy文档。它指出:
groupBy需要在下游排空和消耗这些组才能正常工作。值得注意的是,当标准生成大量组时,如果组未在下游适当消费(例如,由于具有设置得太低的maxConcurrency参数的flatMap),则可能导致挂起。
默认情况下,maxConcurrency(flatMap)
设置为256(我检查了3.2.2的源代码)。因此,选择超过256个组可能会导致执行挂起(特别是当所有执行都发生在同一线程上时)。
下面的代码有助于理解当您将运算符group pBy和phaMap链接时会发生什么:
@Test
public void groupAndFlatmapTest() {
val groupCount = 257;
val groupSize = 513;
val list = rangeClosed(1, groupSize * groupCount).boxed().collect(Collectors.toList());
val source = Flux.fromIterable(list)
.groupBy(i -> i % groupCount)
.flatMap(Flux::collectList);
StepVerifier.create(source).expectNextCount(groupCount).expectComplete().verify();
}
此代码的执行挂起。将groupCount
更改为256或更少会使测试通过(对于groupSize
的每个值)。
因此,关于您最初的问题,很有可能您正在使用键值选择器Auction::getItem
创建大量组。
我用的是Spring助焊剂。我需要从不同的来源组装一个物体。如何确保两个流都返回了所需的数据? 比如:
我本可以使用而不是第二个,因为它不映射任何东西,但我不确定这是否是peek方法的可接受用法。 我也可以在第二个中使用一个有状态映射器来只运行一次,而不是用索引压缩,我想这是可以接受的,因为我已经使用了一个有状态谓词...
1.通过该接口可以获取某用户某天的小时维度的流量信息。地址为: http://spark.bokecc.com/api/traffic/user/hourly 需要传递以下参数: 参数 说明 userid 用户id,不可为空 date 查询日期,格式为yyyy-MM-dd,不可为空 返回数据traffics包含如下字段: 字段名 说明 traffic 流量信息 traffic包含如下字段: 字段名
我有一个drl文件,它在两个规则流组中包含规则:“第一个规则流组”和“第二个规则流组”。这些组的激活取决于“规则A”和“规则B”。是否有任何方法可以停用规则B,以便在规则A条件匹配时触发,从而仅将焦点设置为“第一个规则流组”?
我们正在尝试使用具有net core后端的流,但出现反序列化错误 协议是版本3.14.0 grpc-web-gen是1.2.1 生成客户端的命令是:协议-我=。/原型。/原型/*. proto-js_out=import_style=通用js,二进制:。/dist--grpc-web_out=import_style=通用js dts,模式=grpcwebtext:。/dist 我们成功连接到en