当前位置: 首页 > 知识库问答 >
问题:

如何实现Java8 Stream流畅的API和延迟计算

蒋骏
2023-03-14

我想找出一个相当于Java8 Stream的简单实现,它允许我探索延迟计算的查询算法的开发(例如map()filter()duce()等)。注意:实现比Stream更好的解决方案不是我的目标。另一方面,我唯一的目标是了解Stream的内部结构。

然而,我发现的每个实现都基于迭代

  • 延迟流是如何在Java8中实现的?
  • 流不是数据的容器,而是逻辑的容器
  • Java8内部迭代

然而,我对这些解决方案并不满意,因为:

  1. 它们太冗长了
  2. 它们对于新的查询方法不灵活。包含新的查询方法需要进行结构修改
  3. 尽管有查询参数,但它们没有利用Java8的新特性,例如:第一类函数或默认方法
  4. 它们都不使用拆分器

我知道斯普利特

Spliterator是一个更好的迭代器,即使没有并行性。(它们通常也更容易编写,更难出错。)

因此,是否有可能开发一种更具可读性,更简单,更简洁和更灵活的查询API实现,该查询API的计算方式与流的原理相同。

如果是,你怎么做?我希望看到比上面列出的实现更简单的实现,如果可能的话,利用新的Java8特性。

要求:

  • 不要重用Java 8 API中的现有方法。
  • 并行处理功能超出了本问题的范围
  • 如果可能,最好不要使用<code>Iterable

我提问的原因?我认为学习诸如Stream之类的查询API的最好方法是自己尝试实现那些相同的方法。我在学习的时候就已经成功做到了。net Linq。当然,我没有实现比Linq更好的实现,但这帮助我理解了内部部分。所以,我正试图遵循同样的方法来学习流。

这并不罕见。对于其他技术,有许多研讨会遵循这种方法,例如函数式javascript研讨会,大多数练习要求实现现有方法,例如:map()filter()减少()call()bind()等...

选择的答案:目前我认为Miguel Gamboa的答案是我的选择,而不是Tagir Valeev的答案,因为后者不允许实现findAny()findFirst(),而无需通过dataSrcforeach()完全遍历整个元素。然而,我认为Tagir Valeev的答案在一些中间操作的简洁实现和性能方面还有其他优点,因为基于foreach()的方法减少了迭代代码的开销,该迭代代码中介了对数据结构内部的访问,正如Brian Goetz在其答案的第2点所引用的那样


共有3个答案

孔宇
2023-03-14

首先,我得说我热爱Lambdas和Stream APIs的设计。在JDK的实现也很棒,性能很高。而且我也不确定你学习实现/自己做的目的好不好。但是我确实在我的开源库AbacusUtil中实现了Stream API,包括顺序和并行。以下是github: Stream的源代码。与JDK的实施情况相比,我不能说它有多好。但就个人而言,我认为实现是非常直接和简单的。而且性能也很高。

披露:我是AbacusUtil的开发者。

毛正浩
2023-03-14

使用函数式编程并利用Java 8的默认方法,我们可以实现一个简单明了的查询API延迟计算解决方案。例如,在下面的type Queryable中检查如何轻松实现< code>map()和< code>forEach()方法,然后可以像这样使用它:

List<String> data = Arrays.asList("functional", "super", "formula");
Queryable.of(data) // <=> data.stream().
     .map(String::length)
     .forEach(System.out::println);

如果您将Queryable.of(dataSrc)调用替换为dataSrc.stream(),您将得到相同的结果。下面的示例说明了map()foreach()方法的实现。在Queryable存储库中检查完整的解决方案和更详细的描述。

使用@srborlong an注释更新。已从foreach(消费者)更改foreach签名

@FunctionalInterface
public interface Queryable<T>{

  abstract boolean tryAdvance(Consumer<? super T> action); // <=> Spliterator::tryAdvance

  static <T> boolean truth(Consumer<T> c, T item){
    c.accept(item);
    return true;
  }

  public static <T> Queryable<T> of(Iterable<T> data) {
    final Iterator<T> dataSrc = data.iterator();
    return action -> dataSrc.hasNext() && truth(action, dataSrc.next());
  }

  public default void forEach(Consumer<? super T> action) {
    while (tryAdvance(action)) { }
  }

  public default <R> Queryable<R> map(Function<T, R> mapper) {
    return action -> tryAdvance(item -> action.accept(mapper.apply(item)));
  }
}

隆礼骞
2023-03-14
匿名用户

在没有短路支持的情况下,很容易实现无状态操作的子集。你只需要关心始终坚持内部迭代。基本构造块是< code>forEach操作,它可以对每个输入元素执行给定的操作。< code>forEach方法的主体是唯一在不同阶段发生变化的东西。因此,我们可以用抽象的< code>forEach方法创建抽象类,或者接受一个实际上是< code>forEach主体的函数。我将坚持第二种方法:

public final class MyStream<T> {
    private final Consumer<Consumer<T>> action;

    public MyStream(Consumer<Consumer<T>> action) {
        this.action = action;
    }

    public void forEach(Consumer<T> cons) {
        action.accept(cons);
    }
}

现在让我们创建一些简单的源:

public static <T> MyStream<T> of(Iterable<T> elements) {
    // just redirect to Iterable::forEach
    return new MyStream<>(elements::forEach);
}

@SafeVarargs
public static <T> MyStream<T> of(T... elements) {
    return of(Arrays.asList(elements));
}

public static MyStream<Integer> range(int from, int to) {
    return new MyStream<>(cons -> {
        for(int i=from; i<to; i++) cons.accept(i);
    });
}

现在是中间操作。他们只需要调整通过行动接收的消费者来执行其他操作:

public <U> MyStream<U> map(Function<T, U> mapper) {
    return new MyStream<>(cons -> forEach(e -> cons.accept(mapper.apply(e))));
}

public MyStream<T> filter(Predicate<T> pred) {
    return new MyStream<>(cons -> forEach(e -> {
        if(pred.test(e))
            cons.accept(e);
    }));
}

public <U> MyStream<U> flatMap(Function<T, MyStream<U>> mapper) {
    return new MyStream<>(cons -> forEach(e -> mapper.apply(e).forEach(cons)));
}

public MyStream<T> peek(Consumer<T> action) {
    return new MyStream<>(cons -> forEach(e -> {
        action.accept(e);
        cons.accept(e);
    }));
}

public MyStream<T> skip(long n) {
    return new MyStream<>(cons -> {
        long[] count = {0};
        forEach(e -> {
            if(++count[0] > n)
                cons.accept(e);
        });
    });
}

现在,让我们使用 forEach 创建一些终端操作:

public T reduce(T identity, BinaryOperator<T> op) {
    class Box {
        T val = identity;
    }
    Box b = new Box();
    forEach(e -> b.val = op.apply(b.val, e));
    return b.val;
}

public Optional<T> reduce(BinaryOperator<T> op) {
    class Box {
        boolean isPresent;
        T val;
    }
    Box b = new Box();
    forEach(e -> {
        if(b.isPresent) b.val = op.apply(b.val, e);
        else {
            b.val = e;
            b.isPresent = true;
        }
    });
    return b.isPresent ? Optional.empty() : Optional.of(b.val);
}

public long count() {
    return map(e -> 1L).reduce(0L, Long::sum);
}

public Optional<T> maxBy(Comparator<T> cmp) {
    return reduce(BinaryOperator.maxBy(cmp));
}

public Optional<T> minBy(Comparator<T> cmp) {
    return reduce(BinaryOperator.minBy(cmp));
}

现在我们有了我们的小溪。让我们尝试一下:

System.out.println(MyStream.of(1,2,3,4,5).map(x -> x*2)
                           .reduce(0, Integer::sum));
// 30

System.out.println(MyStream.of("a", "stream", "of", "some", "strings")
                           .flatMap(x -> MyStream.of(", ", x))
                           .skip(1).reduce("", String::concat));
// a, stream, of, some, strings

System.out.println(MyStream.range(0, 100)
                           .filter(x -> x % 3 == 0).count());
// 34

等等。这样的实现非常简单,但非常接近实际Stream API中正在发生的事情。当然,当您添加短路、并行流、原始专业化和更有状态的操作时,事情会变得更加复杂。

请注意,与流 API 不同,此 MyStream 可以重复使用多次:

MyStream<Integer> range = range(0, 10);
range.forEach(System.out::println);
range.forEach(System.out::println); // works perfectly

 类似资料:
  • 本文向大家介绍kafka如何实现延迟队列?相关面试题,主要包含被问及kafka如何实现延迟队列?时的应答技巧和注意事项,需要的朋友参考一下 Kafka并没有使用JDK自带的Timer或者DelayQueue来实现延迟的功能,而是基于时间轮自定义了一个用于实现延迟功能的定时器(SystemTimer)。JDK的Timer和DelayQueue插入和删除操作的平均时间复杂度为O(nlog(n)),并不

  • 是否有可能拥有一个java。util。可选,仅在需要时才进行评估? 我需要将一个传递给一个方法(我无法更改的API),并且这个方法可能会也可能不会使用该的值。由于该值是由一个繁重的操作计算的,因此我想仅在需要时(如果需要)计算该值,例如调用、、等。 类似

  • 问题内容: 我正在尝试用Java做某事,而我需要一些东西在while循环中等待/延迟几秒钟。 我想构建一个步进音序器,并且对Java还是陌生的。有什么建议么? 问题答案: If you want to pause then use java.util.concurrent.TimeUnit: TimeUnit.SECONDS.sleep(1); TimeUnit.MINUTES.sleep(1);

  • 本文向大家介绍Webpack 实现 AngularJS 的延迟加载,包括了Webpack 实现 AngularJS 的延迟加载的使用技巧和注意事项,需要的朋友参考一下 随着你的单页应用扩大,其下载时间也越来越长。这对提高用户体验不会有好处(提示:但用户体验正是我们开发单页应用的原因)。更多的代码意味着更大的文件,直到代码压缩已经不能满足你的需求,你唯一能为你的用户做的就是不要再让他一次性下载整个应

  • 假设: 让我们假设数据插入DynamoDB的速率是巨大的。 上下文: 在DynamoDB表上启用流,这将触发lambda。lambda读取流记录,并在弹性搜索中对记录进行索引。 问题陈述: 在将记录插入DynamoDB的时间和通过流式记录触发lambda的时间之间存在延迟。这种延迟或滞后不断增加,并且与插入DynamoDB的数据量成正比。 如何找到滞后的位置?是流没有立即触发lambda吗?还是因

  • 本文向大家介绍jQuery延迟执行的实现方法,包括了jQuery延迟执行的实现方法的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了jQuery延迟执行的实现方法。分享给大家供大家参考,具体如下: 做一个控件时碰到的一些无法同步处理的事件,可以用这样的延迟方法。 更多关于jQuery相关内容感兴趣的读者可查看本站专题:《jQuery日期与时间操作技巧总结》、《jQuery切换特效与技巧总结》