当前位置: 首页 > 知识库问答 >
问题:

验证反应器流量的起始流

阎璞瑜
2023-03-14
flux
.bufferUntil(new Predicate<>() {
    private int count = 0;

    @Override
    public boolean test(T next) {
        return ++count >= N;
    }
})
// Zip with index to know the first element
.zipWith(Flux.<Integer, Integer>generate(() -> 0, (cur, s) -> {
    s.next(cur);
    return cur + 1;
}))
.map(t -> {
    if (t.getT2() == 0 && !validate(t.getT1()))
        throw new RuntimeException("Invalid");
    return t.getT1();
})
// Flatten buffered elements
.flatMapIterable(identity())

我本可以使用doonnext而不是第二个map,因为它不映射任何东西,但我不确定这是否是peek方法的可接受用法。

我也可以在第二个映射中使用一个有状态映射器来只运行一次,而不是用索引压缩,我想这是可以接受的,因为我已经使用了一个有状态谓词...

共有1个答案

凤高澹
2023-03-14

你的要求听起来很有趣!我们有switchonfirst,这对于验证第一个元素很有用。但是如果您有N个元素要验证,我们可以尝试这样的方法。

这里,我假设我必须验证前5个元素,它们应该<=5。则为有效流。否则,我们只会抛出错误,表示验证失败。

Flux<Integer> integerFlux = Flux.range(1, 10).delayElements(Duration.ofSeconds(1));

integerFlux
        .buffer(5)
        .switchOnFirst((signal, flux) -> {
            //first 5 elements are <= 5, then it is a valid stream
            return signal.get().stream().allMatch(i -> i <= 5) ? flux : Flux.error(new RuntimeException("validation failed"));
        })
        .flatMapIterable(Function.identity())
        .subscribe(System.out::println,
                System.out::println);   

然而,这种方法并不好,因为它每次都要收集5个元素,即使在第一次验证完成后也是如此,这可能是我们不想要的。

AtomicInteger atomicInteger = new AtomicInteger(1);
integerFlux
        .bufferUntil(i -> {
            if(atomicInteger.get() < 5){
                atomicInteger.incrementAndGet();
                return false;
            }
            return true;
        })
        .switchOnFirst((signal, flux) -> {
            return signal.get().stream().allMatch(i -> i <= 5) ? flux : Flux.error(new RuntimeException("validation failed"));
        })
        .flatMapIterable(Function.identity())
        .subscribe(System.out::println,
                   System.out::println);
 类似资料:
  • 我用的是Spring助焊剂。我需要从不同的来源组装一个物体。如何确保两个流都返回了所需的数据? 比如:

  • 公证人/节点在收到事务时如何验证特定的流已被调用? 这是否意味着Corda可以保证流程没有根据相应Cordapp中的声明进行修改?

  • 我想组成一个Reactor链,基本上可以做到以下几点: 验证提交的属性,例如,的长度或的有效性。我会使用下面的验证器。 验证提交的是否已被其他人使用。为此,我将使用反应性存储库。 保存,如果以上所有验证检查都通过。 用户: 反应性存储库: 验证器: 处理程序方法: 助手方法: 从被动的角度来看,我不确定如何实现这一目标。理想情况下,反应链将有3个步骤映射到上面的点。 这是我尝试过的,但我在方法参数

  • 我想使用Confluent的JDBC源连接器将数据从SQL Server表检索到Kafka中。 任何帮助都将不胜感激。

  • 我试图用一个自定义验证器将整个设置为无效,这似乎不起作用。。。 下面代码的目标是比较FromGroup的两个实例。 如果它们匹配,则表示未进行任何更改,因此必须无效。 否则它不匹配,这意味着已经进行了更改,所以它必须是有效的 我的变量在我记录时工作正常。它会根据变化变为真或假。 但是我无法使from无效,我已经尝试了很多方法,比如文章和堆栈溢出,这有一个stackblitz,但它似乎也不起作用。