flux
.bufferUntil(new Predicate<>() {
private int count = 0;
@Override
public boolean test(T next) {
return ++count >= N;
}
})
// Zip with index to know the first element
.zipWith(Flux.<Integer, Integer>generate(() -> 0, (cur, s) -> {
s.next(cur);
return cur + 1;
}))
.map(t -> {
if (t.getT2() == 0 && !validate(t.getT1()))
throw new RuntimeException("Invalid");
return t.getT1();
})
// Flatten buffered elements
.flatMapIterable(identity())
我本可以使用doonnext
而不是第二个map
,因为它不映射任何东西,但我不确定这是否是peek方法的可接受用法。
我也可以在第二个映射
中使用一个有状态映射器来只运行一次,而不是用索引压缩,我想这是可以接受的,因为我已经使用了一个有状态谓词...
你的要求听起来很有趣!我们有switchonfirst
,这对于验证第一个元素很有用。但是如果您有N个元素要验证,我们可以尝试这样的方法。
这里,我假设我必须验证前5个元素,它们应该<=5。则为有效流。否则,我们只会抛出错误,表示验证失败。
Flux<Integer> integerFlux = Flux.range(1, 10).delayElements(Duration.ofSeconds(1));
integerFlux
.buffer(5)
.switchOnFirst((signal, flux) -> {
//first 5 elements are <= 5, then it is a valid stream
return signal.get().stream().allMatch(i -> i <= 5) ? flux : Flux.error(new RuntimeException("validation failed"));
})
.flatMapIterable(Function.identity())
.subscribe(System.out::println,
System.out::println);
然而,这种方法并不好,因为它每次都要收集5个元素,即使在第一次验证完成后也是如此,这可能是我们不想要的。
AtomicInteger atomicInteger = new AtomicInteger(1);
integerFlux
.bufferUntil(i -> {
if(atomicInteger.get() < 5){
atomicInteger.incrementAndGet();
return false;
}
return true;
})
.switchOnFirst((signal, flux) -> {
return signal.get().stream().allMatch(i -> i <= 5) ? flux : Flux.error(new RuntimeException("validation failed"));
})
.flatMapIterable(Function.identity())
.subscribe(System.out::println,
System.out::println);
我用的是Spring助焊剂。我需要从不同的来源组装一个物体。如何确保两个流都返回了所需的数据? 比如:
公证人/节点在收到事务时如何验证特定的流已被调用? 这是否意味着Corda可以保证流程没有根据相应Cordapp中的声明进行修改?
我想组成一个Reactor链,基本上可以做到以下几点: 验证提交的属性,例如,的长度或的有效性。我会使用下面的验证器。 验证提交的是否已被其他人使用。为此,我将使用反应性存储库。 保存,如果以上所有验证检查都通过。 用户: 反应性存储库: 验证器: 处理程序方法: 助手方法: 从被动的角度来看,我不确定如何实现这一目标。理想情况下,反应链将有3个步骤映射到上面的点。 这是我尝试过的,但我在方法参数
我想使用Confluent的JDBC源连接器将数据从SQL Server表检索到Kafka中。 任何帮助都将不胜感激。
我试图用一个自定义验证器将整个设置为无效,这似乎不起作用。。。 下面代码的目标是比较FromGroup的两个实例。 如果它们匹配,则表示未进行任何更改,因此必须无效。 否则它不匹配,这意味着已经进行了更改,所以它必须是有效的 我的变量在我记录时工作正常。它会根据变化变为真或假。 但是我无法使from无效,我已经尝试了很多方法,比如文章和堆栈溢出,这有一个stackblitz,但它似乎也不起作用。