Item item = items.parallelStream()
.map(i -> i.doSomethingExpensive())
.filter(predicate)
.findAny()
.orElse(null);
private static void log(String msg) {
private static void log(String msg) {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
System.out.println(sdf.format(new Date()) + " " + msg);
}
Random random = new Random();
List<Integer> nums = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
Optional<Integer> num = nums.parallelStream()
.map(n -> {
long delay = Math.abs(random.nextLong()) % 10000;
log("Waiting on " + n + " for " + delay + " ms");
try { Thread.sleep(delay); }
catch (InterruptedException e) { System.err.println("Interruption error"); }
return n * n;
})
.filter(n -> n < 30)
.peek(n -> log("Found match: " + n))
.findAny();
log("First match: " + num);
14:52:27.061 Waiting on 9 for 2271 ms
14:52:27.061 Waiting on 2 for 1124 ms
14:52:27.061 Waiting on 13 for 547 ms
14:52:27.061 Waiting on 4 for 517 ms
14:52:27.061 Waiting on 1 for 1210 ms
14:52:27.061 Waiting on 6 for 2646 ms
14:52:27.061 Waiting on 0 for 4393 ms
14:52:27.061 Waiting on 12 for 5520 ms
14:52:27.581 Found match: 16
14:52:27.582 Waiting on 3 for 5365 ms
14:52:28.188 Found match: 4
14:52:28.275 Found match: 1
14:52:31.457 Found match: 0
14:52:32.950 Found match: 9
14:52:32.951 First match: Optional[0]
一旦找到匹配项(在本例中为16),findany()
不会立即返回,而是阻塞,直到剩余的线程完成。在这种情况下,调用方在找到匹配项后返回之前将额外等待5秒。
相反,它似乎要等待map方法完成大部分元素后才能返回。
这是不正确的。
当谈到已经在处理的元素时,它将等待所有元素的完成,因为Stream API允许并发处理本质上不是线程安全的数据结构。它必须确保在从终端操作返回之前已经完成了所有潜在的并发访问。
List<Integer> nums = IntStream.range(0, 200).boxed().collect(Collectors.toList());
Optional<Integer> num = nums.parallelStream()
.map(n -> {
long delay = ThreadLocalRandom.current().nextInt(10_000);
log("Waiting on " + n + " for " + delay + " ms");
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(delay));
return n * n;
})
.filter(n -> n < 40_000)
.peek(n -> log("Found match: " + n))
.findAny();
log("First match: " + num);
static <T,R> Callable<R> mapAndfilter(T t, Function<T,R> f, Predicate<? super R> p) {
return () -> {
R r = f.apply(t);
if(!p.test(r)) throw new NoSuchElementException();
return r;
};
}
不幸的是,只有用值完成或异常完成的选项,因此我们必须对不匹配的元素使用异常。
然后我们可以用它
ExecutorService es = ForkJoinPool.commonPool();
Integer result = es.invokeAny(IntStream.range(0, 100)
.mapToObj(i -> mapAndfilter(i,
n -> {
long delay = ThreadLocalRandom.current().nextInt(10_000);
log("Waiting on " + n + " for " + delay + " ms");
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(delay));
return n * n;
},
n -> n < 10_000))
.collect(Collectors.toList()));
log("result: "+result);
它不仅会取消挂起的任务,还会返回而不等待它们完成。
问题内容: 我有以下代码利用多处理程序来遍历大列表并找到匹配项。在任何一个进程中找到匹配项后,如何使所有进程停止?我已经看到了一些示例,但是我似乎都不适合我在这里所做的事情。 谢谢你的时间。 更新1: 我已经实现了@ShadowRanger的出色方法中建议的更改,并且几乎可以按照我想要的方式工作。因此,我添加了一些日志记录以指示进度,并在其中放置一个“测试”键以进行匹配。我希望能够独立于num_p
问题内容: 我正在尝试执行此查询- 但是我总是会出错 我认为如果用户不存在,则创建新用户。怎么了? 问题答案: 我认为,如果用户不存在,则Grant用法会创建新用户。怎么了? 在以前的mysql版本中就是这种情况。如果您想以新方式进行操作,请参见此答案。如果您想修改系统设置以使其像以前一样工作,请继续阅读。但是,请注意,这是不推荐使用的行为,因为链接文档指出: NO_AUTO_CREATE_USE
有没有一种简单的方法来使用Java8流来实现这一点呢?例如,给定上面的数组和谓词将result设置为“2”。 如果可以获得下一个值,那么在匹配另一个谓词(如)之前,还可以获得下一个N个值或所有值的列表/数组。
我有一个迷宫,我必须用递归来解。迷宫必须在找到开放路径的地方放置一个X(我的代码就是这样做的)。它必须这样做,直到使用递归调用到达出口为止(我的代码就是这样做的,下面描述的除外)。它还必须在到达死胡同的地方放置一个O,将操作系统拉回到“正确”路径,然后沿着新路径继续求解(我的代码就是这样做的)。 然而,一旦到达迷宫的末端,它就必须求解一个新的迷宫(原始迷宫,转置)。我的问题如下: 一旦我到达迷宫的
样品https://github.com/spring-projects/spring-integration-samples/tree/master/basic/tcp-client-server非常适合构建TCP服务器应用程序。它很简单,可以在JVM上运行。它不需要任何应用服务器。 示例使用命令行输入来运行程序。我希望服务器仅从套接字端口而不是通过命令行接受数据。如果删除命令行输入,主线程将完
问题内容: 该程序在九次打印后完成: 如何停止进程(例如eclipse中的Java进程),因为它在9秒的时间限制后没有停止? 问题答案: 您遇到的问题是,取消哔声任务后,调度程序会保留活动线程。 如果存在活动的非守护程序线程,则JVM保持活动状态。 它使该线程保持不变的原因是您已在此行中告诉它这样做: 请注意以下文档: -即使在空闲状态下要保留在池中的线程数。 因此,您有两种可能的方法来导致J