当前位置: 首页 > 知识库问答 >
问题:

Java中的嵌套并行流执行-findAny()随机失败

赵英哲
2023-03-14
        AllDirectedPaths<Vertex, Edge> allDirectedPaths = new AllDirectedPaths<>(graph);
        List<GraphPath<Vertex, Edge>> paths = allDirectedPaths.getAllPaths(entry, exit, true, null);


        return paths.parallelStream().map(path -> path.getEdgeList().parallelStream()
                .map(edge -> {
                    Vertex source = edge.getSource();
                    Vertex target = edge.getTarget();

                    if (source.containsInstruction(method, instructionIndex)) {
                        return source;
                    } else if (target.containsInstruction(method, instructionIndex)) {
                        return target;
                    } else {
                        return null;
                    }
                }).filter(Objects::nonNull)).findAny().flatMap(Stream::findAny)
                .orElseThrow(() -> new IllegalArgumentException("Given trace refers to no vertex in graph!"));

将“find any()”流操作放在哪里有关系吗?我应该直接在过滤器操作后放置它吗?还是嵌套的并行流是问题所在?

共有1个答案

章哲彦
2023-03-14

您应该使用.Flatmap(path->...)并删除.Flatmap(stream::findany)

您的代码无法工作,因为第一个findany()返回的流始终是非空的,但可能包含null元素。

然后,当通过optional.flatmap(stream::findany)调用应用第二个findany()时,最后一个查找操作可能返回一个空的optional,其结果是内部流的null元素。

代码应该是这样的:

return paths.stream()
    .flatMap(path -> path.getEdgeList().stream()
        .map(edge -> 
             edge.getSource().containsInstruction(method, instructionIndex) ?
             edge.getSource()                                               :
             edge.getTarget().containsInstruction(method, instructionIndex) ?
             edge.getTarget()                                               :
             null)
        .filter(Objects::nonNull))
    .findAny()
    .orElseThrow(() -> new IllegalArgumentException("whatever"));

注意:为什么是并行流?在您的管道中似乎没有CPU绑定的任务。此外,并行流会产生大量的开销。它们在非常少的场景中很有用,即在流水线上有数万个元素和密集的CPU操作

编辑:正如注释中所建议的,内部流的mapfilter操作可以安全地移动到外部流。通过这种方式,可读性得到了提高,并且在性能方面没有差异:

return paths.stream()
    .flatMap(path -> path.getEdgeList().stream())
    .map(edge -> 
         edge.getSource().containsInstruction(method, instructionIndex) ?
         edge.getSource()                                               :
         edge.getTarget().containsInstruction(method, instructionIndex) ?
         edge.getTarget()                                               :
         null)
    .filter(Objects::nonNull)
    .findAny()
    .orElseThrow(() -> new IllegalArgumentException("whatever"));
 类似资料:
  • 示例2: 如果流被设置为,就像在第二个示例中那样,我可以想象内部工作者在等待外部工作队列中的线程可用时会阻塞,因为外部工作队列线程必须在内部流完成时阻塞,而默认线程池只有有限数量的线程。但是,似乎不会出现死锁: 两个流共享相同的默认线程池,但它们生成不同的工作单元。每个外部工作单元只能在该外部工作单元的所有内部单元完成之后才能完成,因为在每个并行流的末端有一个完成屏障。 如何通过共享的工作线程池来

  • 请考虑以下代码: 任务是应该并行执行的运行表的列表。当我们启动这个线程,并且它开始执行时,根据一些计算,我们需要中断(取消)所有这些任务。 中断线程只会停止执行中的一个。我们怎么对付别人?或者流不应该这样使用?或者你知道更好的解决办法?

  • 我正在尝试连接spark streaming应用程序中的DB2数据库和导致“org.apache.spark.sparkException:Task not Serializable”问题的数据库查询执行语句。请指教。下面是我有的示例代码供参考。 下面是错误日志:

  • 问题内容: 我遇到了一个奇怪的情况。我不满意,运行以下代码时出现意外结果: 没有抛出异常(即使使用),我看到的是控制台输出为 现在,很明显,此代码没有实际的生产价值,但这是一种情况的表示,其中您的代码具有未知数量的嵌套,其中每个嵌套或其中的一些嵌套将无法执行。 任何解释(以及有关如何修复的示例)将不胜感激 问题答案: 之所以不起作用,是因为在您的简单测试中,VM在所有任务完成之前就退出了。 当您致

  • 我找不到关于我们被要求进行的调查的具体答案 我看到并行流在使用少量线程时性能可能不是那么好,而且当DB在处理当前请求的同时阻止下一个请求时,它的表现显然也不是那么好 然而,我发现实现任务执行器与并行流的开销是巨大的,我们实现了一个POC,它只需要这一行代码就能满足并发需求: 而在Task Executor中,我们需要重写Runnable接口并编写一些繁琐的代码,以使Runnable不是空的,并返回

  • 我是javascript新手,这可能是一个简单的问题。。。 我在这里所做的是修改div(#box)的css样式,这取决于你所在页面的位置( 我试图实现的是向#box添加多个css更改,并每次执行一个随机更改。例如,每次从最顶端滚动通过200,div的显示高度为:绿色背景为“100px”,蓝色背景为“200px”,或红色背景为“300px”。希望这有意义。。。。