当前位置: 首页 > 知识库问答 >
问题:

如何停止平行流一旦找到任何匹配?

葛成济
2023-03-14
Item item = items.parallelStream()
  .map(i -> i.doSomethingExpensive())
  .filter(predicate)
  .findAny()
  .orElse(null);
private static void log(String msg) {
    private static void log(String msg) {
    SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
    System.out.println(sdf.format(new Date()) + " " + msg);
}

Random random = new Random();
List<Integer> nums = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
Optional<Integer> num = nums.parallelStream()
  .map(n -> {
    long delay = Math.abs(random.nextLong()) % 10000;
    log("Waiting on " + n + " for " + delay + " ms");
    try { Thread.sleep(delay); }
    catch (InterruptedException e) { System.err.println("Interruption error"); }
    return n * n;
  })
  .filter(n -> n < 30)
  .peek(n -> log("Found match: " + n))
  .findAny();

log("First match: " + num);
14:52:27.061 Waiting on 9 for 2271 ms
14:52:27.061 Waiting on 2 for 1124 ms
14:52:27.061 Waiting on 13 for 547 ms
14:52:27.061 Waiting on 4 for 517 ms
14:52:27.061 Waiting on 1 for 1210 ms
14:52:27.061 Waiting on 6 for 2646 ms
14:52:27.061 Waiting on 0 for 4393 ms
14:52:27.061 Waiting on 12 for 5520 ms
14:52:27.581 Found match: 16
14:52:27.582 Waiting on 3 for 5365 ms
14:52:28.188 Found match: 4
14:52:28.275 Found match: 1
14:52:31.457 Found match: 0
14:52:32.950 Found match: 9
14:52:32.951 First match: Optional[0]

一旦找到匹配项(在本例中为16),findany()不会立即返回,而是阻塞,直到剩余的线程完成。在这种情况下,调用方在找到匹配项后返回之前将额外等待5秒。

共有1个答案

尹赞
2023-03-14

相反,它似乎要等待map方法完成大部分元素后才能返回。

这是不正确的。

当谈到已经在处理的元素时,它将等待所有元素的完成,因为Stream API允许并发处理本质上不是线程安全的数据结构。它必须确保在从终端操作返回之前已经完成了所有潜在的并发访问。

List<Integer> nums = IntStream.range(0, 200).boxed().collect(Collectors.toList());
Optional<Integer> num = nums.parallelStream()
        .map(n -> {
            long delay = ThreadLocalRandom.current().nextInt(10_000);
            log("Waiting on " + n + " for " + delay + " ms");
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(delay));
            return n * n;
        })
        .filter(n -> n < 40_000)
        .peek(n -> log("Found match: " + n))
        .findAny();

log("First match: " + num);
static <T,R> Callable<R> mapAndfilter(T t, Function<T,R> f, Predicate<? super R> p) {
    return () -> {
        R r = f.apply(t);
        if(!p.test(r)) throw new NoSuchElementException();
        return r;
    };
}

不幸的是,只有用值完成或异常完成的选项,因此我们必须对不匹配的元素使用异常。

然后我们可以用它

ExecutorService es = ForkJoinPool.commonPool();
Integer result = es.invokeAny(IntStream.range(0, 100)
    .mapToObj(i -> mapAndfilter(i,
        n -> {
            long delay = ThreadLocalRandom.current().nextInt(10_000);
            log("Waiting on " + n + " for " + delay + " ms");
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(delay));
            return n * n;
        },
        n -> n < 10_000))
    .collect(Collectors.toList()));

log("result: "+result);

它不仅会取消挂起的任务,还会返回而不等待它们完成。

 类似资料:
  • 问题内容: 我有以下代码利用多处理程序来遍历大列表并找到匹配项。在任何一个进程中找到匹配项后,如何使所有进程停止?我已经看到了一些示例,但是我似乎都不适合我在这里所做的事情。 谢谢你的时间。 更新1: 我已经实现了@ShadowRanger的出色方法中建议的更改,并且几乎可以按照我想要的方式工作。因此,我添加了一些日志记录以指示进度,并在其中放置一个“测试”键以进行匹配。我希望能够独立于num_p

  • 问题内容: 我正在尝试执行此查询- 但是我总是会出错 我认为如果用户不存在,则创建新用户。怎么了? 问题答案: 我认为,如果用户不存在,则Grant用法会创建新用户。怎么了? 在以前的mysql版本中就是这种情况。如果您想以新方式进行操作,请参见此答案。如果您想修改系统设置以使其像以前一样工作,请继续阅读。但是,请注意,这是不推荐使用的行为,因为链接文档指出: NO_AUTO_CREATE_USE

  • 有没有一种简单的方法来使用Java8流来实现这一点呢?例如,给定上面的数组和谓词将result设置为“2”。 如果可以获得下一个值,那么在匹配另一个谓词(如)之前,还可以获得下一个N个值或所有值的列表/数组。

  • 我有一个迷宫,我必须用递归来解。迷宫必须在找到开放路径的地方放置一个X(我的代码就是这样做的)。它必须这样做,直到使用递归调用到达出口为止(我的代码就是这样做的,下面描述的除外)。它还必须在到达死胡同的地方放置一个O,将操作系统拉回到“正确”路径,然后沿着新路径继续求解(我的代码就是这样做的)。 然而,一旦到达迷宫的末端,它就必须求解一个新的迷宫(原始迷宫,转置)。我的问题如下: 一旦我到达迷宫的

  • 问题内容: 该程序在九次打印后完成: 如何停止进程(例如eclipse中的Java进程),因为它在9秒的时间限制后没有停止? 问题答案: 您遇到的问题是,取消哔声任务后,调度程序会保留活动线程。 如果存在活动的非守护程序线程,则JVM保持活动状态。 它使该线程保持不变的原因是您已在此行中告诉它这样做: 请注意以下文档: -即使在空闲状态下要保留在池中的线​​程数。 因此,您有两种可能的方法来导致J

  • 问题内容: 我有一个goroutine,它调用一个方法,并在通道上传递返回的值: 如何停止这种goroutine? 问题答案: 编辑: 在意识到您的问题是关于将值发送到goroutine中的chan之前,我匆忙编写了此答案。 下面的方法可以与上面建议的其他chan一起使用,或者利用您已经拥有的chan双向的事实,您可以只使用一个… 如果您的goroutine仅用于处理来自chan的项目,则可以使用