当前位置: 首页 > 知识库问答 >
问题:

当Java8 Stream抛出RuntimeExcure时,预期的行为是什么?

江同化
2023-03-14

在流处理过程中遇到运行时异常时,流处理是否应该中止?它应该先结束吗?是否应在流上重新调用异常。关闭()?异常是按原样重新生成还是已包装?的JavaDoc和java包。util。小溪对此无话可说。

我发现有关Stackoverflow的所有问题似乎都集中在如何从函数接口中包装一个已检查的异常,以使其代码能够编译。事实上,互联网上的博客帖子和类似文章都关注相同的警告。这与我无关。

根据我自己的经验,一旦抛出运行时异常,序列流的处理就会中止,并且该异常会按原样重新启动。只有当异常是由客户端线程引发时,这对于并行流是相同的。

然而,这里的示例代码表明,如果异常是在并行流处理期间由“工作线程”(=与调用终端操作的线程不同)引发的,那么该异常将永远丢失,流处理将完成。

示例代码将首先并行运行IntStream。然后,一个“正常的”并行运行。

这个例子将表明,

1) IntStream在遇到运行时异常时中止并行处理没有问题。将重新引发异常,并将其包装在另一个RuntimeException中。

2) 播放效果不太好。事实上,客户端线程永远不会看到抛出的RuntimeException的跟踪。流不仅完成了处理;将处理比指定的limit()更多的元素!

在示例代码中,使用IntStream.range()生成IntStream。普通的Stream没有范围的概念,而是由1: s组成,但是调用Stream.limit()将流限制在10亿个元素。

这是另一个转折点。生成IntStream的示例代码如下所示:

IntStream.range(0, 1_000_000_000).parallel().forEach(..)

将其更改为生成的流,就像代码中的第二个示例一样:

IntStream.generate(() -> 1).limit(1_000_000_000).parallel().forEach(..)

此IntStream的结果是相同的:异常被包装并重试,处理中止。但是,第二个流现在也将包装并重新发送异常,并且不会处理超过限制的元素!因此:更改第一个流的生成方式会对第二个流的行为产生副作用。对我来说,这很奇怪。

ForkJoinPool的JavaDoc。invoke()和ForkJoinTask表示异常被重新调用,这是我从并行流中所期望的。

我在处理取自Collection.stream()的并行流中的元素时遇到了这个“问题”。并行()(我还没有验证Collection.parallelStream()的行为,但它应该是相同的)。发生的情况是,当所有其他线程成功完成流时,一个“工作线程”崩溃,然后静静地消失了。我的应用程序使用将异常写入日志文件的默认异常处理程序。但甚至没有创建这个日志文件。线程和他的异常就这样消失了。因为我需要在运行时异常被捕获时中止,一种选择是编写代码将此异常泄漏给其他工作人员,使他们不愿意继续,如果异常已被任何其他线程抛出。当然,这并不能保证流实现仅仅继续生成试图完成流的新线程。因此,我可能最终不会使用并行流,而是使用线程池/执行器进行“正常”并发编程。

这表明运行时异常丢失的问题并不孤立于流生成的流。使用流生成()或流。limit()。底线是我很想知道。。这是预期的行为吗?


共有1个答案

王弘和
2023-03-14

关于异常报告,这两个流的行为没有区别,问题是您将两个测试一个接一个地放到一个方法中,并让它们访问共享数据结构。

有一种微妙的、可能没有充分记录(如果有意)的行为:当流操作异常完成时,它不会等待所有并发操作的完成。

因此,当您捕获第一个流操作的异常时,仍然有一些线程在运行并访问您的共享数据。因此,当您重置您的原子布尔时,这些属于第一个作业的线程之一将读取false值,将其转换为true,打印消息并抛出一个异常,该异常丢失,作为流操作已经破例完成。此外,其中一些线程会在您重置计数器后提高计数器,这就是为什么它的数量比第二个作业允许的要高。您的第二个作业不会异常完成,因为属于第二个作业的所有线程将从原子布尔中读取一个true值。

有一些方法可以发现这一点。

当您删除第一个流操作时,第二个流操作将按预期异常完成。另外,插入语句

ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS);

在这两个流操作之间,将修复该问题,因为它将等待所有线程的完成。

然而,更干净的解决方案是让两个流操作使用它们自己的计数器和标志。

也就是说,有一个微妙的、依赖于实现的差异,如果您只交换这两个操作,就会导致问题消失。IntStream。range操作生成一个大小已知的流,允许将其拆分为并发任务,这些任务本质上知道要处理多少个元素。这允许在上述例外情况下放弃这些任务。另一方面,将generate返回的无限流与limit组合不会生成大小合适的流(尽管这是可能的)。由于此类流被视为具有未知大小,因此子任务必须在计数器上同步,以确保遵守限制。这导致子任务(有时)完成,即使在例外情况下也是如此。但如前所述,这是实施细节的副作用,而不是有意等待完成。由于它是关于并发性的,如果您多次运行它,结果可能会有所不同。

 类似资料:
  • 我在Spring Boot项目中使用Resilience4J调用REST客户机,如下所示: 看到示例中包含了一个回退方法,我决定添加它,尽管我并不真的想调用不同的方法,我只想再次调用我的原始方法。 不管怎样,我指定了一个回落: 现在,我看到回退方法被重试,但是每次都会抛出HttpServerErrorException,这意味着使用者将收到一个异常作为对其调用的响应。 谢谢

  • 问题内容: 我正在尝试解析这样的JSON字符串 进入对象列表。 这是我正在使用的对象类。 但这让我着迷 有什么想法我应该如何解决? 问题答案: 问题是你要告诉你具有你类型的对象。你不知道 你有一系列类型的对象。你不能只是尝试像这样投射结果并期望它神奇地工作;) 《用户指南》介绍了如何处理此问题: https://github.com/google/gson/blob/master/UserGuid

  • 我有一个Spring/JPA配置,其中Hibernate作为持久性提供者。但是,我不明白为什么在没有打开事务的情况下对以下DAO代码调用save()时没有抛出TransactionRequiredException(DAO/服务中没有@Transactional): 正如预期的那样,实体没有保存,但为什么没有引发异常?持久化的javadoc表示,持久化()应该抛出一个“Transaction必需的

  • 问题内容: 我们正在重用一个使用spring java- config(使用@Configuration)定义其bean的项目,并且在一个此类中有一个init方法。 这里的预期行为是什么?何时调用此方法?关于豆子,那就是。即,此方法的行为是否完全像配置类是Bean一样(实际上是一个吗?) 我们观察到的是,根据操作系统的不同,可以在初始化进入配置类的Bean之前调用它,从而最终导致不完全依赖项的工作

  • 我有以下代码... 我得到... 我也试过 字符串的示例为 更新 存储的过程定义...

  • 我期望这段代码(在对细化类型使用模式匹配后调用匿名类的方法) 要打印 (以及未经检查的警告)。 我知道由于类型擦除,匹配总是成功的,但这不应该导致问题,因为的运行时类型(甚至考虑擦除)应该是