这与如何在Stream上短路减少有本质上相同的问题?。但是,由于该问题集中在布尔值流上,并且其答案不能推广到其他类型并减少操作,因此我想提出一个更笼统的问题。
我们如何对流进行还原,以使其在遇到用于还原操作的吸收元素时短路?
对于乘法,典型的数学情况将为0。这Stream
:
int product = IntStream.of(2, 3, 4, 5, 0, 7, 8)
.reduce(1, (a, b) -> a * b);
无论遇到什么事实产品都会被知道,它将消耗最后两个元素(7
和8
)0
。
不幸的是,StreamAPI具有有限的功能来创建您自己的短路操作。不是那么干净的解决方案是扔掉RuntimeException
并抓住它。这是的实现IntStream
,但也可以将其推广到其他流类型:
public static int reduceWithCancelEx(IntStream stream, int identity,
IntBinaryOperator combiner, IntPredicate cancelCondition) {
class CancelException extends RuntimeException {
private final int val;
CancelException(int val) {
this.val = val;
}
}
try {
return stream.reduce(identity, (a, b) -> {
int res = combiner.applyAsInt(a, b);
if(cancelCondition.test(res))
throw new CancelException(res);
return res;
});
} catch (CancelException e) {
return e.val;
}
}
用法示例:
int product = reduceWithCancelEx(
IntStream.of(2, 3, 4, 5, 0, 7, 8).peek(System.out::println),
1, (a, b) -> a * b, val -> val == 0);
System.out.println("Result: "+product);
输出:
2
3
4
5
0
Result: 0
请注意,即使它适用于并行流,也不能保证其他并行任务之一一旦抛出异常便会完成。已经开始的子任务可能会一直运行到完成,因此您处理的元素可能超出预期。
更新
:更长的时间,但更并行友好的替代解决方案。它基于自定义分隔符,该分隔符最多返回一个元素(这是所有基础元素的累加结果)。在顺序模式下使用它时,它将在一次tryAdvance
调用中完成所有工作。拆分时,每个部分都会生成相应的单个部分结果,Stream引擎会使用合并器功能将其减少。这是通用版本,但原始专业化也是可能的。
final static class CancellableReduceSpliterator<T, A> implements Spliterator<A>,
Consumer<T>, Cloneable {
private Spliterator<T> source;
private final BiFunction<A, ? super T, A> accumulator;
private final Predicate<A> cancelPredicate;
private final AtomicBoolean cancelled = new AtomicBoolean();
private A acc;
CancellableReduceSpliterator(Spliterator<T> source, A identity,
BiFunction<A, ? super T, A> accumulator, Predicate<A> cancelPredicate) {
this.source = source;
this.acc = identity;
this.accumulator = accumulator;
this.cancelPredicate = cancelPredicate;
}
@Override
public boolean tryAdvance(Consumer<? super A> action) {
if (source == null || cancelled.get()) {
source = null;
return false;
}
while (!cancelled.get() && source.tryAdvance(this)) {
if (cancelPredicate.test(acc)) {
cancelled.set(true);
break;
}
}
source = null;
action.accept(acc);
return true;
}
@Override
public void forEachRemaining(Consumer<? super A> action) {
tryAdvance(action);
}
@Override
public Spliterator<A> trySplit() {
if(source == null || cancelled.get()) {
source = null;
return null;
}
Spliterator<T> prefix = source.trySplit();
if (prefix == null)
return null;
try {
@SuppressWarnings("unchecked")
CancellableReduceSpliterator<T, A> result =
(CancellableReduceSpliterator<T, A>) this.clone();
result.source = prefix;
return result;
} catch (CloneNotSupportedException e) {
throw new InternalError();
}
}
@Override
public long estimateSize() {
// let's pretend we have the same number of elements
// as the source, so the pipeline engine parallelize it in the same way
return source == null ? 0 : source.estimateSize();
}
@Override
public int characteristics() {
return source == null ? SIZED : source.characteristics() & ORDERED;
}
@Override
public void accept(T t) {
this.acc = accumulator.apply(this.acc, t);
}
}
与Stream.reduce(identity, accumulator, combiner)
和类似的方法Stream.reduce(identity, combiner)
,但具有cancelPredicate
:
public static <T, U> U reduceWithCancel(Stream<T> stream, U identity,
BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner,
Predicate<U> cancelPredicate) {
return StreamSupport
.stream(new CancellableReduceSpliterator<>(stream.spliterator(), identity,
accumulator, cancelPredicate), stream.isParallel()).reduce(combiner)
.orElse(identity);
}
public static <T> T reduceWithCancel(Stream<T> stream, T identity,
BinaryOperator<T> combiner, Predicate<T> cancelPredicate) {
return reduceWithCancel(stream, identity, combiner, combiner, cancelPredicate);
}
让我们测试两个版本并计算实际处理的元素数量。让我们0
结束一点。例外版本:
AtomicInteger count = new AtomicInteger();
int product = reduceWithCancelEx(
IntStream.range(-1000000, 100).filter(x -> x == 0 || x % 2 != 0)
.parallel().peek(i -> count.incrementAndGet()), 1,
(a, b) -> a * b, x -> x == 0);
System.out.println("product: " + product + "/count: " + count);
Thread.sleep(1000);
System.out.println("product: " + product + "/count: " + count);
典型输出:
product: 0/count: 281721
product: 0/count: 500001
因此,当仅处理某些元素时返回结果时,任务将继续在后台运行,并且计数器仍在增加。这是分离器版本:
AtomicInteger count = new AtomicInteger();
int product = reduceWithCancel(
IntStream.range(-1000000, 100).filter(x -> x == 0 || x % 2 != 0)
.parallel().peek(i -> count.incrementAndGet()).boxed(),
1, (a, b) -> a * b, x -> x == 0);
System.out.println("product: " + product + "/count: " + count);
Thread.sleep(1000);
System.out.println("product: " + product + "/count: " + count);
典型输出:
product: 0/count: 281353
product: 0/count: 281353
返回结果后,所有任务实际上都已完成。
将使用最后两个元素(和),而不考虑一旦遇到就知道产品了。
我理解下面的区别(至少对于Java): 但我的问题是,当你在处理布尔表达式时,有没有理由使用非短路运算符?是否有一些性能上的好处或使用不会被认为是坏的练习?
Java 8 还新增了 Stream、IntStream、LongStream、DoubleStream 等流式 API,这些 API 代表多个支持串行和并行聚集操作的元素。上面 4 个接口中,Stream 是一个通用的流接口,而 IntStream、LongStream、 DoubleStream 则代表元素类型为 int、long、double 的流。 Java 8 还为上面每个流式 API
问题内容: 我知道如何从-> “转换”一个简单的Java ,即: 现在,我想对地图进行基本相同的操作,即: 解决方案不应限于-> 。就像上面的示例一样,我想调用任何方法(或构造函数)。 问题答案: 它不如列表代码那么好。您不能在通话中构造new ,因此工作会混入通话中。
本文向大家介绍JAVA8 stream中三个参数的reduce方法对List进行分组统计操作,包括了JAVA8 stream中三个参数的reduce方法对List进行分组统计操作的使用技巧和注意事项,需要的朋友参考一下 背景 平时在编写前端代码时,习惯使用lodash来编写‘野生'的JavaScript; lodash提供来一套完整的API对JS对象(Array,Object,Collection
我正在尝试使用并行流连接字符串。 我在下面的代码中也发现了同样的问题。 在这里,我还使用了一个同步集合,所有的方法都是线程安全的。 我在Java文档中看到了这个 我是不是漏掉了什么?使用线程安全的数据结构还不够吗?