当前位置: 首页 > 知识库问答 >
问题:

替换的组通过运算符在Reactor

阙阳夏
2023-03-14

这是这个问题的后续问题。answers中提出的解决方案是使用groupBy运算符。这通常是好的,但正如其文档中所提到的,不建议与大量不同的键一起使用,比如说数以万计的键。

data
  .groupBy(Data::getPointID)
  .flatMap(sameIdFlux -> sameIdFlux
    .concatMap(processor::process)
  )
  .subscribe();

每个群体在本质上都有无限的元素,这些元素可能随时到达。此外,我需要限制并发处理的组的数量。据我所知,如果我使用上面的代码,要么我会达到开放组的隐式限制,新组不会被打开(处理),要么Ï最终会达到内存不足,因为即使是长时间不活动的组也不会关闭(想想删除的实体),因此会白白消耗一些内存开销。

是否有一些运算符/模式可以用于实现相同的行为而不会遇到上述问题?我最初试图用一些合理的持续时间来关闭每个组,但当一个组关闭并且相同的Id到达时,我对比赛条件持开放态度,因此它们将被并行处理,这是不理想的。

编辑:我正在进行更多的研究并尝试更多的方法,目前我最大的问题似乎是如何正确管理背压/正确限制最大并发而不限制组数本身。数据生成通常是线性的,但有时会产生我需要相应限制的大峰值。

共有1个答案

朱欣荣
2023-03-14

我对Spring通量和Reactor项目是陌生的,所以我不知道有什么现成的模式可以解决你的问题。但是,您可以创建自己的模式来限制使用groupBy操作符创建的组的数量。

在下面的示例中,我使用了int partition=I%numberOfPartitions的模式 灵感来自Apache Flink的这篇博客文章,它决定了分割流的分区数。

    public Flux<GroupedFlux<Integer, Data>> createFluxUsingGroupBy(List<String> dataList, int numberOfPartitions, int maxCount) {
        return Flux
                .fromStream(IntStream.range(0, maxCount)
                        .mapToObj(i -> {
                            int randomPosition = ThreadLocalRandom.current().nextInt(0, dataList.size());
                            int partition = i % numberOfPartitions;
                            return new Data(i, dataList.get(randomPosition), partition);
                        })
                )
                .delayElements(Duration.ofMillis(10))
                .log()
                .groupBy(Data::getPartition);
    }
........
@lombok.Data
@AllArgsConstructor
@NoArgsConstructor
public class Data {
    private Integer key;
    private String value;
    private Integer partition;
}

当我使用numberOfPartitions=3执行它时,无论我使用的键是什么,我都会有从0到2(3个分区)的分区。

    @Test
    void testFluxUsingGroupBy() {
        int numberOfPartitions = 3;
        int maxCount = 100;
        Flux<GroupedFlux<Integer, Data>> dataGroupedFlux = fluxAndMonoTransformations.createFluxUsingGroupBy(expect, numberOfPartitions, maxCount);
        StepVerifier.create(dataGroupedFlux)
                .expectNextCount(numberOfPartitions)
                .verifyComplete();
    }

这是日志:

10:43:02.168 [Test worker] INFO reactor.Flux.ConcatMap.1 - onSubscribe(FluxConcatMap.ConcatMapImmediate)
10:43:02.179 [Test worker] INFO reactor.Flux.ConcatMap.1 - request(256)
10:43:02.291 [parallel-1] INFO reactor.Flux.ConcatMap.1 - onNext(Data(key=0, value=Spring, partition=0))
10:43:02.362 [parallel-1] INFO reactor.Flux.ConcatMap.1 - request(1)
10:43:02.375 [parallel-2] INFO reactor.Flux.ConcatMap.1 - onNext(Data(key=1, value=Scala, partition=1))
10:43:02.377 [parallel-2] INFO reactor.Flux.ConcatMap.1 - request(1)
10:43:02.388 [parallel-3] INFO reactor.Flux.ConcatMap.1 - onNext(Data(key=2, value=reactive programming, partition=2))
10:43:02.389 [parallel-3] INFO reactor.Flux.ConcatMap.1 - request(1)
10:43:02.400 [parallel-4] INFO reactor.Flux.ConcatMap.1 - onNext(Data(key=3, value=java with lambda, partition=0))
10:43:02.411 [parallel-1] INFO reactor.Flux.ConcatMap.1 - onNext(Data(key=4, value=Spring, partition=1))
10:43:02.422 [parallel-2] INFO reactor.Flux.ConcatMap.1 - onNext(Data(key=5, value=java 8, partition=2))
10:43:02.433 [parallel-3] INFO reactor.Flux.ConcatMap.1 - onNext(Data(key=6, value=java with lambda, partition=0))
10:43:02.444 [parallel-4] INFO reactor.Flux.ConcatMap.1 - onNext(Data(key=7, value=java with lambda, partition=1))
...

在没有私有整数密钥的情况下增强该解决方案 在数据对象上可用,我可以基于哈希生成分区。我使用了另一个参数,即并行度。如果您使用并行度X将值保存在存储器上,并且之后读取相同的值但使用不同的并行度,则基本上是用于恢复操作X可以保留同一组上的值。所以我使用了int partition=(getDifferentHashCode(value)*parallelism)%numberOfPartitions 这也是受到我提到的博客帖子的启发。我更喜欢这种方法

    public Flux<GroupedFlux<Integer, Data>> createFluxUsingHashGroupBy(List<String> dataList, int numberOfPartitions, int parallelism, int maxCount) {
        return Flux
                .fromStream(IntStream.range(0, maxCount)
                        .mapToObj(i -> {
                            int randomPosition = ThreadLocalRandom.current().nextInt(0, dataList.size());
                            String value = dataList.get(randomPosition);
                            int partition = (getDifferentHashCode(value) * parallelism) % numberOfPartitions;
                            return new Data(i, value, partition);
                        })
                )
                .delayElements(Duration.ofMillis(10))
                .log()
                .groupBy(Data::getPartition);
    }

    public int getDifferentHashCode(String value) {
        int hash = 7;
        for (int i = 0; i < value.length(); i++) {
            hash = hash * 31 + value.charAt(i);
        }
        return hash;
    }

单元测试:

    @Test
    void testFluxUsingHashGroupBy() {
        int numberOfPartitions = 3;
        int parallelism = 2;
        int maxCount = 100;
        Flux<GroupedFlux<Integer, Data>> dataGroupedFlux = fluxAndMonoTransformations.createFluxUsingHashGroupBy(expect, numberOfPartitions, parallelism, maxCount);
        StepVerifier.create(dataGroupedFlux)
                .expectNextCount(numberOfPartitions)
                .verifyComplete();
    }

关于背压问题,我认为它可能会出现在另一个SO问题中。

 类似资料:
  • 我有一些我无法控制的代码。此代码接受一个对象参数,并尝试将其转换为编译时已知的类型,如下所示: 在C#中是否可以设计一个自定义类(不是从KnownType派生的),该类可以作为参数传递给上述代码,并通过上述代码转换为,前提是可以使用其成员方法将自身转换为: 我曾尝试实现这样的自定义转换运算符: 但是它不起作用(它没有被使用)。假设转换运算符仅在编译时已知源类型、目标类型和转换运算符时才起作用,这是

  • 问题内容: 我正在尝试编写SQL更新,以使用新字符串替换特定的xml节点: 以便 变成 这种类型的请求缺少语法吗? 问题答案: 更新:MySQL 8.0具有功能REGEX_REPLACE()。 以下是我2014年的回答,该回答仍然适用于8.0之前的任何版本的MySQL: REPLACE()不支持通配符,模式,正则表达式等。REPLACE()仅将一个常量字符串替换为另一个常量字符串。 您可以尝试一些

  • 我有这个功能来保存新记录或更新现有项: 以下是模式:

  • 我对把两种不同的东西结合在一起有意见。我尝试了很多关于SO的建议,但没有任何效果,因此我发布了我的问题。 所以要求是 我需要在Java中构建动态查询(用于搜索jsonb列类型) 我需要使用准备好的Java语句(避免sql注入) 我需要替换单引号内的参数 CLI中的这个查询非常有效 我想参数化的部分是“金融”。所以当我构建这个查询时 执行时,我得到以下错误 是因为?在单引号内,所以jdbc驱动程序可

  • 问题内容: $test = array(‘hi’); $test += array(‘test’,’oh’); var_dump($test); PHP中的数组意味着什么? 问题答案: 引用PHP语言操作员手册 +运算符返回添加到左侧数组的右侧数组;对于两个数组中都存在的键,将使用左侧数组中的元素,而右侧数组中的匹配元素将被忽略。 所以如果你这样做 你会得到 因此,的逻辑等效于以下代码段: 如果您

  • 我正在尝试在我的Nginx Ubuntu AWS EC2实例上使用Certbot生成通配符Let's Encrypt SSL证书。 Ubuntu: 16.04.5 LTS Nginx: v1.10.3 我现在无法生成通配符SSL,因为我收到了以下错误: 发生意外错误: 创建新订单时出错::DNS名称没有足够的标签 我已经查看了此错误的常见原因,我的请求中似乎没有任何错误: (我只是在my-doma