当前位置: 首页 > 知识库问答 >
问题:

Java8并行流在抛出异常时的行为如何?

林俭
2023-03-14

Java 8并行流在consuming子句中抛出异常时如何表现,例如在forEach处理中?例如,以下代码:

final AtomicBoolean throwException = new AtomicBoolean(true);
IntStream.range(0, 1000)
    .parallel()
    .forEach(i -> {
        // Throw only on one of the threads.
        if (throwException.compareAndSet(true, false)) {
            throw new RuntimeException("One of the tasks threw an exception. Index: " + i);
        });

它是否立即停止处理的元素?它是否等待已启动的元素完成?它是否等待所有的流完成?它是否在抛出异常后开始处理流元素?

什么时候回来?在异常之后立即?消费者处理完所有/部分元素后?

在并行流引发异常后,是否继续处理元素?(找到了发生这种情况的案例)。

这里有一般规则吗?

编辑(15-11-2016)

试图确定并行流是否提前返回,我发现它不是确定的:

@Test
public void testParallelStreamWithException() {
    AtomicInteger overallCount = new AtomicInteger(0);
    AtomicInteger afterExceptionCount = new AtomicInteger(0);
    AtomicBoolean throwException = new AtomicBoolean(true);

    try {
        IntStream.range(0, 1000)
            .parallel()
            .forEach(i -> {
                overallCount.incrementAndGet();
                afterExceptionCount.incrementAndGet();
                try {
                    System.out.println(i + " Sleeping...");
                    Thread.sleep(1000);
                    System.out.println(i + " After Sleeping.");
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // Throw only on one of the threads and not on main thread.
                if (!Thread.currentThread().getName().equals("main") && throwException.compareAndSet(true, false)) {
                    System.out.println("Throwing exception - " + i);
                    throw new RuntimeException("One of the tasks threw an exception. Index: " + i);
                }
            });
        Assert.fail("Should not get here.");
    }
    catch (Exception e) {
        System.out.println("Cought Exception. Resetting the afterExceptionCount to zero - 0.");
        afterExceptionCount.set(0);
    }
    System.out.println("Overall count: " + overallCount.get());
    System.out.println("After exception count: " + afterExceptionCount.get());
}

不从主线程抛出时返回较晚。这导致抛出异常后需要处理许多新元素。在我的机器上,抛出异常后处理了大约200个元素。但是,并不是所有的1000个元素都被处理。这里的规则是什么?即使抛出了异常,为什么还要处理更多的元素?

删除not()符号时提前返回,导致在主线程中引发异常。只有已经启动的元素完成了处理,没有处理新的元素。这里的情况就是早归。与以前的行为不一致。

我错过了什么?

共有1个答案

谷泽宇
2023-03-14

当异常在其中一个阶段被抛出时,它不等待其他操作完成,异常被重新抛出给调用方。ForkJoinpool就是这样处理的。

相反,findFirst(例如并行运行时)仅在所有操作完成处理后(即使在需要完成所有操作之前就知道结果)才会向调用方显示结果。

换句话说:它会提前返回,但会让所有正在运行的任务完成。

编辑回复最后一条评论

霍尔格的回答(评论中的链接)对此有很大的解释,但这里有一些细节。

1) 在杀死除主线程以外的所有线程时,您也在杀死本应由这些线程处理的所有任务。所以这个数字应该在250左右,因为有1000个任务和4个线程,我假设返回3?:

int result = ForkJoinPool.getCommonPoolParallelism();

理论上有1000个任务,有4个线程,每个线程应该处理250个任务,然后你杀死其中的3个,意味着750个任务丢失。还有250个任务要执行,ForkJoinpool将跨越3个新线程来执行这250个剩下的任务。

你可以尝试一些事情,像这样改变你的流(使流不大小):

IntStream.generate(random::nextInt).limit(1000).parallel().forEach

这一次,将有更多的操作结束,因为初始拆分索引未知,并由其他策略选择。您还可以尝试更改以下内容:

 if (!Thread.currentThread().getName().equals("main") && throwException.compareAndSet(true, false)) {

对此:

 if (!Thread.currentThread().getName().equals("main")) {

这一次,你总是会杀死除了main之外的所有线程,直到某个时候,ForkJoinpool不会创建新的线程,因为任务太小,无法分割,因此不需要其他线程。在这种情况下,完成的任务甚至更少。

2) 第二个示例是,当您实际终止主线程时,按照代码的方式,您将看不到其他线程的实际运行。更改它:

    } catch (Exception e) {
        System.out.println("Cought Exception. Resetting the afterExceptionCount to zero - 0.");
        afterExceptionCount.set(0);
    }

    // give some time for other threads to finish their work. You could play commenting and de-commenting this line to see a big difference in results. 
    TimeUnit.SECONDS.sleep(60);

    System.out.println("Overall count: " + overallCount.get());
    System.out.println("After exception count: " + afterExceptionCount.get());
 类似资料:
  • 问题内容: 最近,我接受了公司的采访,他们给了我一个编码问题。我得到了与纸牌有关的程序,其中一种方法是将纸牌洗牌。因此,我将该程序编写为: 在上面的代码中,我引发了我最怀疑的 IllegalArgumentException 。在什么情况下实际上应该抛出运行时异常?我们是否应该实际抛出运行时异常? 谢谢 问题答案: 我们是否应该实际抛出运行时异常? 是的,我们应该。运行时异常有特定的用途-它们发出

  • [信息][错误]无法执行目标org.apache.maven.plugins:maven-deploy-plugin:2.7:在项目模型上部署(default-deploy):无法部署项目:无法将项目COM传输。test:model:jar:0.1.0从/到central(http://localhost:8081/artifactory/libs-release/com/srcrea/model

  • 我有以下情况: null 请注意,许多服务访问用户存储库,因此在保存之前直接实现新逻辑是很麻烦的。 为了在保存/更新之前执行验证,而不必为每个保存/更新调用显式实现验证,我尝试使用JPA侦听器。使用和,我实现了在每次保存/更新之前执行验证的目标。但是,这种解决方案存在两个问题: 检查的异常不会在保存/更新用户的方法上强制执行,并且异常会通过侦听器自动包装为 当试图使用AOP捕获所述并解压缩原始检查

  • 我试图在lambda内部抛出一个异常,但它总是给我一个错误,表示未处理的IOException。 有人能告诉我我做错了什么吗?

  • 我拥有的现有代码: 我想丰富现有代码,并在收到一些不允许的数据时引发异常,我做了以下更改: 但由于以下原因,我的实现甚至不可编译: 您能为我的特定场景提供建议吗?

  • 在你可以捕获异常之前,一些代码必须抛出一个异常。任何代码都可能会抛出异常:您的代码,来自其他人编写的包(例如Java平台附带的包)或Java运行时环境的代码。无论是什么引发的异常,它总是通过 throw 语句抛出。 您可能已经注意到,Java平台提供了许多异常类。所有类都是Throwable类的后代,并且都允许程序区分在程序执行期间可能发生的各种类型的异常。 您还可以创建自己的异常类来表示在您编写