当前位置: 首页 > 知识库问答 >
问题:

如何短路流上的reduce()操作?

蒋权
2023-03-14
int product = IntStream.of(2, 3, 4, 5, 0, 7, 8)
        .reduce(1, (a, b) -> a * b);

将使用最后两个元素(78),而不考虑一旦遇到0就知道产品了。

共有1个答案

鲍永春
2023-03-14

不幸的是,Stream API创建自己的短路操作的能力有限。抛出runtimeexception并捕获它并不是一个干净的解决方案。下面是intStream的实现,但它也可以推广到其他流类型:

public static int reduceWithCancelEx(IntStream stream, int identity, 
                      IntBinaryOperator combiner, IntPredicate cancelCondition) {
    class CancelException extends RuntimeException {
        private final int val;

        CancelException(int val) {
            this.val = val;
        }
    }

    try {
        return stream.reduce(identity, (a, b) -> {
            int res = combiner.applyAsInt(a, b);
            if(cancelCondition.test(res))
                throw new CancelException(res);
            return res;
        });
    } catch (CancelException e) {
        return e.val;
    }
}

用法示例:

int product = reduceWithCancelEx(
        IntStream.of(2, 3, 4, 5, 0, 7, 8).peek(System.out::println), 
        1, (a, b) -> a * b, val -> val == 0);
System.out.println("Result: "+product);

输出:

2
3
4
5
0
Result: 0
final static class CancellableReduceSpliterator<T, A> implements Spliterator<A>,
        Consumer<T>, Cloneable {
    private Spliterator<T> source;
    private final BiFunction<A, ? super T, A> accumulator;
    private final Predicate<A> cancelPredicate;
    private final AtomicBoolean cancelled = new AtomicBoolean();
    private A acc;

    CancellableReduceSpliterator(Spliterator<T> source, A identity,
            BiFunction<A, ? super T, A> accumulator, Predicate<A> cancelPredicate) {
        this.source = source;
        this.acc = identity;
        this.accumulator = accumulator;
        this.cancelPredicate = cancelPredicate;
    }

    @Override
    public boolean tryAdvance(Consumer<? super A> action) {
        if (source == null || cancelled.get()) {
            source = null;
            return false;
        }
        while (!cancelled.get() && source.tryAdvance(this)) {
            if (cancelPredicate.test(acc)) {
                cancelled.set(true);
                break;
            }
        }
        source = null;
        action.accept(acc);
        return true;
    }

    @Override
    public void forEachRemaining(Consumer<? super A> action) {
        tryAdvance(action);
    }

    @Override
    public Spliterator<A> trySplit() {
        if(source == null || cancelled.get()) {
            source = null;
            return null;
        }
        Spliterator<T> prefix = source.trySplit();
        if (prefix == null)
            return null;
        try {
            @SuppressWarnings("unchecked")
            CancellableReduceSpliterator<T, A> result = 
                (CancellableReduceSpliterator<T, A>) this.clone();
            result.source = prefix;
            return result;
        } catch (CloneNotSupportedException e) {
            throw new InternalError();
        }
    }

    @Override
    public long estimateSize() {
        // let's pretend we have the same number of elements
        // as the source, so the pipeline engine parallelize it in the same way
        return source == null ? 0 : source.estimateSize();
    }

    @Override
    public int characteristics() {
        return source == null ? SIZED : source.characteristics() & ORDERED;
    }

    @Override
    public void accept(T t) {
        this.acc = accumulator.apply(this.acc, t);
    }
}

类似于stream.reduce(identity,accumulator,combiner)stream.reduce(identity,combiner)的方法,但具有cancelpredicate:

public static <T, U> U reduceWithCancel(Stream<T> stream, U identity,
        BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner,
        Predicate<U> cancelPredicate) {
    return StreamSupport
            .stream(new CancellableReduceSpliterator<>(stream.spliterator(), identity,
                    accumulator, cancelPredicate), stream.isParallel()).reduce(combiner)
            .orElse(identity);
}

public static <T> T reduceWithCancel(Stream<T> stream, T identity,
        BinaryOperator<T> combiner, Predicate<T> cancelPredicate) {
    return reduceWithCancel(stream, identity, combiner, combiner, cancelPredicate);
}

让我们测试这两个版本,并统计实际处理了多少元素。让我们将0靠近结尾。异常版本:

AtomicInteger count = new AtomicInteger();
int product = reduceWithCancelEx(
        IntStream.range(-1000000, 100).filter(x -> x == 0 || x % 2 != 0)
                .parallel().peek(i -> count.incrementAndGet()), 1,
        (a, b) -> a * b, x -> x == 0);
System.out.println("product: " + product + "/count: " + count);
Thread.sleep(1000);
System.out.println("product: " + product + "/count: " + count);

典型输出:

product: 0/count: 281721
product: 0/count: 500001

因此,当只处理某些元素时返回结果时,任务仍在后台继续工作,计数器仍在增加。以下是spliterator版本:

AtomicInteger count = new AtomicInteger();
int product = reduceWithCancel(
        IntStream.range(-1000000, 100).filter(x -> x == 0 || x % 2 != 0)
                .parallel().peek(i -> count.incrementAndGet()).boxed(), 
                1, (a, b) -> a * b, x -> x == 0);
System.out.println("product: " + product + "/count: " + count);
Thread.sleep(1000);
System.out.println("product: " + product + "/count: " + count);

典型输出:

product: 0/count: 281353
product: 0/count: 281353
 类似资料:
  • 问题内容: 这与如何在Stream上短路减少有本质上相同的问题?。但是,由于该问题集中在布尔值流上,并且其答案不能推广到其他类型并减少操作,因此我想提出一个更笼统的问题。 我们如何对流进行还原,以使其在遇到用于还原操作的吸收元素时短路? 对于乘法,典型的数学情况将为0。这: 无论遇到什么事实产品都会被知道,它将消耗最后两个元素(和)。 问题答案: 不幸的是,StreamAPI具有有限的功能来创建您

  • 问题内容: 假设我有一个布尔值流,而我正在编写的reduce操作是||(OR)。我是否可以这样编写它:如果true遇到值,则放弃对至少某些元素的求值? 我正在寻找某种程度的优化(也许是并行流),不一定要完全优化,尽管后者会很棒。 问题答案: 我怀疑您想要这种构造。 你可以看一下 Stream.of(1, 2, 3, 4).peek(System.out::println).anyMatch(i -

  • 我理解下面的区别(至少对于Java): 但我的问题是,当你在处理布尔表达式时,有没有理由使用非短路运算符?是否有一些性能上的好处或使用不会被认为是坏的练习?

  • 问题内容: 阅读了有关Java 8的一些知识后,我进入了这篇博客文章,解释了有关流及其还原的一些知识,以及何时有可能使还原短路。在底部,它指出: 请注意,在或的情况下,我们只需要与谓词匹配的第一个值(尽管不能保证返回第一个)。但是,如果流没有排序,则我们的行为应类似于。的操作,并且可能不会短路,因为在所有它的流可能需要评估所有的值,以确定操作者是否是或。因此,使用它们的无限流可能不会终止。 我知道

  • 阅读了一下Java8,我读到了这篇博文,解释了一些关于流和它们的减少,以及什么时候可以短路减少。在底部,它说: 请注意,在个或我们只需要与谓词匹配的第一个值(尽管值不能保证返回第一个值)。但是,如果流没有排序,那么我们希望的行为类似于。所有、和 操作可能根本不会使流短路,因为可能需要计算所有值来确定运算符是还是。因此,使用这些的无限流可能不会终止。 我知道或可能会使还原短路,因为一旦您找到一个元素

  • 我正在尝试使用并行流连接字符串。 我在下面的代码中也发现了同样的问题。 在这里,我还使用了一个同步集合,所有的方法都是线程安全的。 我在Java文档中看到了这个 我是不是漏掉了什么?使用线程安全的数据结构还不够吗?