当前位置: 首页 > 知识库问答 >
问题:

为什么阿卡流吞噬我的例外?

储修谨
2023-03-14

为什么会有例外

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source

object TestExceptionHandling {
  def main(args: Array[String]): Unit = {
    implicit val actorSystem = ActorSystem()
    implicit val materializer = ActorMaterializer()(defaultActorSystem)

    Source(List(1, 2, 3)).map { i =>
      if (i == 2) {
        throw new RuntimeException("Please, don't swallow me!")
      } else {
        i
      }
    }.runForeach { i =>
      println(s"Received $i")
    }
  }
}

静默忽略?我可以看到流在打印收到1后停止,但没有记录任何内容。请注意,问题不是一般的日志配置,因为如果我在我的application.conf文件中设置akka.log-config-on-start=on,我会看到很多输出。

共有3个答案

萧和平
2023-03-14

我对Akka Streams吞下我的异常有不同的问题。我将在这里发布它,因为这是Google的顶级结果。

在这样的情况下,其中sourceSource[ByteString, Any]

source.runWith(StreamConverters.fromOutputStream(() => outputStream))

这将返回一个未来[IOResult]。如果对输出流的写入失败(例如,源失败),那么未来仍将返回成功。在这种情况下,您实际上必须检查IOResult是否存在错误:

source.runWith(StreamConverters.fromOutputStream(() => output)).
      map(ior => {
        if (!ior.wasSuccessful) 
          throw new RuntimeException(ior.getError)
      })

这样做的结果将是一个失败的未来,只有正确的例外。

阎懿轩
2023-03-14

当我开始使用akk streams时,我也有类似的问题<代码>监督。Decider有帮助,但并不总是如此。

不幸的是,它没有捕获ActionPublisher中抛出的异常。我看到它被处理了,ActorPublisher。调用了onError,但它没有到达监督。决策者。它使用文档中提供的简单流。

如果我使用Sink.actorRef,错误也不会到达参与者。

为了实验,我尝试了以下样本

val stream = Source(0 to 5).map(100 / _)
stream.runWith(Sink.actorSubscriber(props))

在这种情况下,异常被决策者捕获,但从未到达参与者订阅者。

总的来说,我认为这是不一致的行为。我不能使用一种机制来处理流中的错误。

我最初的SO问题:自定义监督。决定器不捕获ActorPublisher产生的异常

这里是akka问题,它被跟踪:https://github.com/akka/akka/issues/18359

令狐经武
2023-03-14

我现在正在使用一个自定义的监控。Decider确保正确记录异常,可以这样设置:

val decider: Supervision.Decider = { e =>
  logger.error("Unhandled exception in stream", e)
  Supervision.Stop
}

implicit val actorSystem = ActorSystem()
val materializerSettings = ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider)
implicit val materializer = ActorMaterializer(materializerSettings)(actorSystem)

此外,正如Vikor Klang所指出的,在上面给出的示例中,也可以通过

Source(List(1, 2, 3)).map { i =>
  if (i == 2) {
    throw new RuntimeException("Please, don't swallow me!")
  } else {
    i
  }
}.runForeach { i =>
  println(s"Received $i")
}.onComplete {
  case Success(_) =>
    println("Done")
  case Failure(e) =>
    println(s"Failed with $e")
}

然而,请注意,这种方法不会帮助您解决问题

Source(List(1, 2, 3)).map { i =>
  if (i == 2) {
    throw new RuntimeException("Please, don't swallow me!")
  } else {
    i
  }
}.to(Sink.foreach { i =>
  println(s"Received $i")
}).run()

由于run()返回Unit

 类似资料:
  • 问题内容: 我有这样的字符串。 我想将其拆分以获取以下输出。\ $是转义的$,因此应将其保留在输出中。 但是,当我做 ÿ得到。 从我得到的正则表达式匹配a $和i $然后删除。关于如何找回角色的任何想法吗? 谢谢 问题答案: 使用零宽度匹配断言: 正则表达式本质上是 它使用负向后看来断言没有在前。 也可以看看 Regular-expressions.info/解决方法 断言分裂的更多示例 简单的句

  • 我很困惑,无法理解为什么不应该吞掉InterruptedException。 IBM的文章说 当阻塞方法检测到中断并抛出InterruptedException时,它将清除中断状态。如果捕捉到InterruptedException,但无法重新抛出它,则应保留中断发生的证据,以便调用堆栈上更高的代码能够获悉中断,并在需要时对其做出响应 还请解释一下上面的规则?

  • 为什么Mockito会吞噬堆栈痕迹?例如,如果我有一个 和一个测试,例如 抛出的异常看起来总是 (这里提供的示例只是一个简化--我要处理更多的间接、类等等。我不能让Mockito吞噬部分模拟堆栈跟踪的关键部分……)

  • 问题内容: 请帮忙。将来自扫描仪的提示显示到控制台后,出现此错误: 问题是来自printf吗?我检查了格式,它看起来正确。 这是程序: 问题答案: 答:因为您的格式说明符与方法中使用的输入参数不匹配。 使用而不是将值用作格式说明符 另外需要额外的逃脱角色 最后,删除不必要的点字符 阅读:Formatter javadoc

  • 外网下载 oss 资源,每个 GB 要 0.5 元,太可怕了 https://www.aliyun.com/price/product?spm=a2c4g.109686.0.0.2cc... 钱被谁转走了?三大运营商?

  • 问题内容: 我有一个设置为启动Java活动(称为MyJavaActivity)的android应用,这反过来又会启动NativeActivity。NativeActivity完成后,它将返回到MyJavaActivity。 我还有一个Java单例类(称为MyJavaSingleton),我希望在应用程序的整个生命周期中都将其保留在内存中。我从NativeActivity(使用JNI)中设置了一些单