当前位置: 首页 > 知识库问答 >
问题:

流式多过滤器内部件

昌乐生
2023-03-14

我试图理解Java的流API的内部调用。

我有下面的代码,它有两个过滤器(中间)操作和一个终端操作。

IntStream.of(1,2,3)
    .filter(e->e%2==0)
    .filter(e->e==2)
    .forEach(e->System.out.println(e));

我看到,对于每个中间操作,都会返回一个新流,并使用重写的filter方法。一旦它命中terminal方法,流就执行过滤器。如果有两个filter操作,我看到filter()正在运行两次,而不是一次。

我想了解一个流遍历如何能够调用过滤器两次。

粘贴下面的IntPipeline代码,在Stream中为filter方法命中该代码。

@Override
public final IntStream filter(IntPredicate predicate) {
    Objects.requireNonNull(predicate);
    return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
                                    StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
            return new Sink.ChainedInt<Integer>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(int t) {
                    if (predicate.test(t)) ///line 11
                        downstream.accept(t);
                }
            };
        }
    };
}

共有1个答案

古刚洁
2023-03-14

我将尽力解释在流API的幕后发生了什么,首先你应该改变你到目前为止是如何编程的想法,试着得到这个新的想法。

所以要举一个现实世界想象工厂的例子(我指的是现实世界中的现实工厂,而不是工厂设计模式),在一个工厂中,我们有一些原材料和一些在不同阶段将原材料转化为成品的连续过程。要掌握这一概念,请参见下图:

(阶段1)原料->(阶段2)处理输入并将输出传递到下一阶段->(阶段3)处理输入并将输出传递到下一阶段->.....

因此,第一级的输出是原材料,随后的所有级对它们的输入进行一些处理并传递(例如,它可以将该级的输入转换成其他的东西,或者完全拒绝该输入,因为它的质量低),然后,它将输出移交给前面的另一级。从现在起,我们把这个连续的阶段统称为管道。

什么是处理?它可以是任何东西,例如,一个阶段可以决定将输入转换成完全不同的东西并将其传递(这正是map在Stream API中为我们提供的),另一个阶段可以基于某种条件允许输入传递(这正是filter在Stream API中所做的)。

Java Stream API做了类似于工厂的事情。每个流都是一个管道,您可以向每个管道添加另一个阶段并创建一个新的管道,因此当您编写IntStream.of(1,2,3)时,您已经创建了一个管道,它是IntStream,因此让我们分解您的代码:

IntStream intStream = IntStream.of(1,2,3)

这相当于我们厂的原料,所以它是一条只有一级的管道。然而,拥有一条只通过原料的管道是没有好处的。让我们在前面的管道上再增加一个阶段,并创建一个新的管道:

IntStream evenNumbePipeline = intStream.filter(e -> e%2==0);

请注意,这里您创建了一个新的流水线,这个流水线正好是前一个加上另一个阶段,只允许偶数通过,拒绝其他的。当您调用filter方法的以下部分代码时,准确地创建一个新管道:

@Override
public final IntStream filter(IntPredicate predicate) {
    Objects.requireNonNull(predicate);
    return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
                                    StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
            return new Sink.ChainedInt<Integer>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(int t) {
                    if (predicate.test(t)) ///line 11
                        downstream.accept(t);
                }
            };
        }
    };
}

您可以看到filter返回StateLessOp 的新实例,该实例扩展了IntPipeline如下:

abstract static class StatelessOp<E_IN> extends IntPipeline<E_IN> 

在我们的例子中,我们将stage2添加到工厂中,并在输入时告诉它,检查它是否为偶数,然后允许它传递,如果它是偶数。我们正在讨论你的问题,让我们在我们的管道中再增加一个阶段:

IntStream onlyTwo = evenNumbePipeline.filter(e -> e==2);

在这里,您创建了新的管道,该管道获得前一个管道(即evenNumbePipeline)并向该管道添加另一个阶段(evenNumbePipeline没有更改,我们创建了新的管道,其中包含evenNumbePipeline)。让我们来看看目前为止我们的管道:

raw material(stage1) -> filter even number(stage2) -> filter only 2(stage3)

把它看作是我们管道中的各个阶段的定义,而不是操作,也许我们还没有原材料,但是我们可以设计我们的工厂,这样我们以后就可以为它提供原材料。您可以看到这个管道有三个阶段,每个阶段都对前一个阶段的输出做一些事情。管道将由原材料一个一个提供(现在忘记并行流fro),所以当你提供1作为原材料到这个管道时,它会经历这些阶段。每个阶段都是Java中的新对象

让我们来看看你在问题中说的

我想了解一个流遍历如何能够调用过滤器两次。

从我们到目前为止所调查的情况来看,您认为流的遍历调用了两次filter方法,还是当我们创建管道时调用了两次filter方法?

我们两次调用这个filter方法,因为我们希望在我们的管道中有两个不同的阶段。想想一个工厂,我们两次调用filter method是因为我们在设计它的时候,希望在我们的工厂中有两个不同的滤波器级。我们还没有经过工厂阶段,我们还没有生产任何成品。

来点乐趣,出点输出吧:

onlyTwo.forEach(e -> System.out.println(e));

由于foreach是一个终端操作,它启动了我们的工厂,并为我们工厂的管道提供原材料。因此,例如1经历stage2,然后经历stage3,然后传递给foreach语句。

但是还有一个问题:当我们在设计我们的管道时,我们如何定义每个阶段做什么?

在我们的示例中,当我们设计管道时,我们调用filter方法来创建新的stage,并将filter stage应该做的事情作为参数传递给它,这个参数的名称是predicate。每个阶段中的opwrapsink方法定义了每个阶段接收到输入时准确执行的操作。因此,我们必须在创建stage时实现这个方法,所以让我们回到我们的filter方法,在这里流类为我们创建新的stage:

@Override
public final IntStream filter(IntPredicate predicate) {
    Objects.requireNonNull(predicate);
    return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
                                    StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
            return new Sink.ChainedInt<Integer>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(int t) {
                    if (predicate.test(t)) ///line 11
                        downstream.accept(t);
                }
            };
        }
    };
}

您可以看到每个阶段的opwrapsink方法返回一个sink,但是sink是什么?

为了消除这个接口的复杂性,它是一个使用者,并有一个如下所示的accept方法(它也有许多其他用于基元类型的accept方法,以避免不必要的装箱和取消装箱):

void accept(T t);

当您实现这个接口时,您应该定义您想要对将在阶段中作为输入传递的输入值做什么。您不需要在程序中实现此接口,因为stream实现中的方法已经为您完成了繁重的工作。让我们看看它在filter Case中是如何实现的:

@Override
Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
    return new Sink.ChainedInt<Integer>(sink) {
        @Override
        public void begin(long size) {
            downstream.begin(-1);
        }

        @Override
        public void accept(int t) {
            if (predicate.test(t)) ///line 11
                downstream.accept(t);
        }
    };
}

流框架为opwrapsink方法提供了下一个阶段的sink(作为调用该方法时的第二个参数),这意味着我们知道我们管道中的下一个阶段是如何完成他们的工作的(在这个sink的帮助下),但是我们应该为他们提供一个输入,很清楚,下一个阶段的输入是当前阶段的输出。我们需要产生当前级输出的另一个参数是对我们当前级的输入。

输入到当前阶段->对输入执行当前阶段的操作->将输出传递到下一个阶段(管道中的以下阶段)

因此,在accept方法中,我们有作为参数t的当前阶段的输入,我们应该对这个输入做一些事情(作为当前阶段对输入的操作),然后将它传递到下一个阶段。在我们的筛选阶段,我们需要检查到我们的阶段的输入(即T)是否传递了谓词(在我们的例子中是E%2==0),然后我们应该将它传递到下一个阶段sink。下面正是我们的accept方法所做的事情(下游正是管道中以下阶段的sink):

@Override
public void accept(int t) {
    if (predicate.test(t)) ///line 11
        downstream.accept(t);
}

在这个accept方法实现中您应该注意到的是,如果当前阶段的输入传递了一个谓词(在我们的例子中是E%2==0),那么它只会将当前阶段的输入传递到下一个阶段,如果它不传递谓词,它就不会传递它(这正是我们希望过滤阶段做的事情);

 类似资料:
  • 考虑我有以下代码 我想做的是我有一个映射,它的值是字符串列表。 这些值将被转换成整数,并将被检查,如果该值大于给定的值,如第二/内过滤器。 我可以把它分成几个小部分,因为我使用了两个过滤器,但从内部过滤器我无法得到布尔值,所以我将得到所需的流,然后使用flatmat(Collection::stream)合并,然后将其转换为set(以避免重复),因为这两个列表都包含一个公共值,即4 我需要的输出必

  • 有时您希望筛选具有多个条件的: 也可以使用复杂条件和单个: 第一种方法在可读性方面胜出,但在性能方面,什么更好呢?

  • 过滤器作为读写操作的流内容传输过程中的附加阶段. 要注意的是直到php 4.3中才加入了流过滤器, 在php 5.0对流过滤器的API设计做过较大的调整. 本章的内容遵循的是php 5的流过滤器规范. 在流上应用已有的过滤器 在一个打开的流上应用一个已有的过滤器只需要几行代码即可: php_stream *php_sample6_fopen_read_ucase(const char *path

  • 问题内容: 虽然这两个给我相同的结果,但在内部或外部安装过滤器有什么区别?在表现或幕后动作方面是否有所不同? 问题答案: 在查询中过滤 Elasticsearch将以更有效的方式执行查询和过滤,以减少结果集并尽快获得答案。这称为filtered_query 查询后过滤 首先运行查询, 然后对 结果进行过滤, 然后 再将其返回给客户端。这称为post_filter。 尽管post_filter的效率

  • 有没有一种方法可以使用一些谓词和java流来过滤内部和外部列表?例如: 是否有一种方法可以过滤列表,使得列表具有字段,并且相应的的列表也会被相应的字段过滤? 我试过: 但它似乎无法过滤内部列表。 描述性示例: 如果一个外部列表有3个具有的对象,并且3个对象中的每一个都有1个内部对象在它们的中具有,那么它应该给我一个包含3个

  • 我想要找到合适的正则表达式匹配的行数。输入是通过Java Stream插入的日志文件。我想对这个流应用多个过滤器,但每隔一段时间计算一次。