当前位置: 首页 > 知识库问答 >
问题:

Akka流错误处理。如何知道哪一行失败了?

裴理
2023-03-14
val decider: Supervision.Decider = {
  case _: Exception => Supervision.Restart
  case _ => Supervision.Stop
}

implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider))

val source = Source(1 to 10)
 val flow = Flow[Int].map{x => if (x != 9) 2 * x else throw new Exception("9!")}
 val sink : Sink[Int, Future[Done]] = Sink.foreach[Int](x => println(x))
 val graph = RunnableGraph.fromGraph(GraphDSL.create(sink){implicit builder => s =>
  import GraphDSL.Implicits._
  source ~> flow ~> s.in
  ClosedShape
})
val future = graph.run()
future.onComplete{ _ =>
  actorSystem.terminate()
}
Await.result(actorSystem.whenTerminated, Duration.Inf)

但这里我没有显式地使用Actor,尽管它们是在内部使用的。是否存在流/源/汇的生命周期事件?

共有1个答案

许招
2023-03-14

只需对代码进行一个小的修改:

  val decider: Supervision.Decider = {
  case e: Exception =>
    println("Exception handled, recovering stream:" + e.getMessage)
    Supervision.Restart
  case _ => Supervision.Stop
}

如果您将有意义的消息传递给流中的异常,例如行,您可以在监管决定器中打印它们。

我使用println给出了一个快速而简短的答案,但强烈建议使用一些日志库,如scala-logging

 类似资料:
  • 在我的Jenkins管道中,我通常使用< code>post声明函数向我发送电子邮件,以防管道出现故障。 函数的简单语法如下: 在上面的电子邮件中,我还想提到管道的哪个阶段(假设管道有5到6个阶段)失败了。我该怎么做?任何帮助都非常感谢。 上述要求的扩展将是向用户提供实际的错误日志(失败阶段的错误日志),也作为失败通知电子邮件的一部分。 想法是,当用户收到来自 jenkins 的失败通知时,他应该

  • 我正在通过设置configure SSL='tlsv1.2'来验证我的SSL算法,这给了我200个响应代码。 但是,如果algirithm不是TLSV1.2,它只是抛出一个没有响应代码的原始错误,如下所示: 14:53:26.025 javax.net.ssl.SSLHandShakeException:握手期间远程主机关闭连接,URL:https://xxxx.com.au/event/coun

  • 关于这个问题的答案。对我来说,使用此代码时会发生错误。我想知道从catch块中准确地发生了哪种错误。您应该通过捕捉错误类型来实现这一点,但我在jsonObject文档中没有看到枚举的错误类型 在swift中有没有一种方法可以做到这一点,或者我如何找到一个对象抛出的异常?

  • 我在spark streaming应用程序中看到一些失败的批处理,原因是与内存相关的问题,如 无法计算拆分,找不到块输入-0-1464774108087

  • 如何对反应流管道进行错误处理。喜欢 应用程序错误处理(例如:errorChannel) 系统错误处理(使用DLQ、再处理等) 当前文档仅描述了非反应性管道的错误处理。https://docs.spring.io/spring-cloud-stream/docs/Fishtown.BUILD-SNAPSHOT/reference/htmlsingle/#_application_error_hand