def decodeService: Flow[Message, Message, _] = {
Flow[Message].map {
case BinaryMessage.Strict(encodeMsg) =>
try {
WebServer.getWorkerActor ! QueueWork(protoMsg(this, encodeMsg))
} catch {
case e: Exception => {
println("[ERROR] failed to send BinaryMessage.Strict: " + e)
TextMessage("[ERROR] failed receiving BinaryMessage.Strict")
}
}
TextMessage("[INFO] BinaryMessage.Strict")
case BinaryMessage.Streamed(streamedMsg) => {
implicit val system = ActorSystem("DecoderSystem")
implicit val materializer = ActorMaterializer()
val streamedMsgFuture: Future[Seq[ByteString]] = streamedMsg.runWith(Sink.seq)
streamedMsgFuture.onComplete { completedStream =>
var completeBytestring = new ByteStringBuilder()
//I'm sure there's a better way to do this.. but hey, it's O(n)
completedStream.foreach { x =>
x.foreach { y =>
completeBytestring ++= y
}
}
try {
WebServer.getWorkerActor ! QueueWork(protoMsg(this, completeBytestring.result()))
} catch {
case e: Exception => {
println("[ERROR] failed to send BinaryMessage.Streamed: " + e)
TextMessage("[ERROR] failed receiving BinaryMessage.Streamed")
}
} finally {
completeBytestring.clear()
}
}
TextMessage("[INFO] BinaryMessage.Streamed")
}
case TextMessage.Strict(txt) => TextMessage("Succesfully receive text message")
case _ => TextMessage("Message type unsupported")
}
}
我是否错误地使用了流/接收器/源?如何冲洗溪流?
谢谢
好吧,最突出的问题是,您为接收到的每个流消息创建一个全新的actorSystem
。ActorSystem
就像您的参与者的线程池;您希望尽可能少地创建它们,理想情况下,整个应用程序只创建一个。您不仅为每个消息创建它们,而且不关闭它们--在ActorSystem
中配置的所有分派程序,以及它所持有的所有资源都将永远挂起。当然,如果您接收到大量的流式消息,您的内存使用量将会增加。
由于您使用了akka-http,因此您必须有一个ActorSystem
,在此调用http().bind*
。您需要在decodeservice
方法中使其可访问。此外,您计算组合字节流的方式在我看来似乎过于复杂。考虑这样写:
def decodeService: Flow[Message, Message, _] = Flow[Message].mapAsync(4) {
case m: BinaryMessage.Strict =>
Future.successful(m)
case BinaryMessage.Streamed(streamMsg) =>
streamMsg.runReduce(_ ++ _).map(BinaryMessage.Strict)
case m =>
Future.successful(m)
}.map {
case BinaryMessage.Strict(encodeMsg) =>
try {
WebServer.getWorkerActor ! QueueWork(protoMsg(this, encodeMsg))
TextMessage("[INFO] BinaryMessage.Strict")
} catch {
case NonFatal(e) =>
println("[ERROR] failed to send BinaryMessage.Strict: " + e)
TextMessage("[ERROR] failed receiving BinaryMessage.Strict")
}
case TextMessage.Strict(txt) => TextMessage("Succesfully receive text message")
case _ => TextMessage("Message type unsupported")
}
在这里,我们首先将所有二进制消息转换为binarymessage.strict
,然后像在原始代码中一样处理它们。注意,您必须在try
块内写入确认消息,否则即使出现异常,也会返回成功消息。此外,如果您决定根本不处理文本消息,代码可能会变得更加简单:
def decodeService: Flow[Message, Message, _] = Flow[Message]
.filterNot(_.isText)
.mapAsync(4) {
case BinaryMessage.Strict(binary) =>
Future.successful(binary)
case BinaryMessage.Stream(binaryStream) =>
binaryStream.runReduce(_ ++ _)
.map { encodeMsg =>
try {
WebServer.getWorkerActor ! QueueWork(protoMsg(this, encodeMsg))
TextMessage("[INFO] BinaryMessage.Strict")
} catch {
case NonFatal(e) =>
println("[ERROR] failed to send BinaryMessage.Strict: " + e)
TextMessage("[ERROR] failed receiving BinaryMessage.Strict")
}
}
编辑:我已经更新了日志片段,以显示dispatcher线程数超出了我的预期。 编辑#2:以下是健康检查路由代码:
我有一个基于AKKA-HTTP的服务,它是用Scala编写的。此服务用作API调用的代理。它使用https://doc.akka.io/docs/akka-http/current/client-side/host-level.html创建一个用于调用API的主机连接池 我想了解这种“之”字形模式的原因,即使服务上没有流量,并且主机池中的连接由于空闲超时而终止。 此外,我还想知道,是否只有在达到一
这是我认为我应该用于这种方法的布局:并且为了适应404路由,还可以使用。现在,如果我的Akka流知识对我有用的话,我需要使用来处理这样的事情,然而,这就是我被困住的地方。 在中,我可以为不同的endpoint进行简单的映射和flatMap,但在流中,这意味着将流划分为多个流,我不太确定该如何进行。我想过使用UnzipWith和Options或通用广播。 如能在这方面提供任何帮助,将不胜感激。 如果
我已经将我的微服务部署到AWS服务器上的docker容器中,该容器使用Akka-HTTP(https://github.com/theiterators/akka-http-microservice)和Scala编写。一旦我将服务部署到AWS服务器上,我就会面临内存泄漏问题和性能问题。 如果它正确地清除了未使用的内存/资源,我们就可以避免这种情况。JVM应该使用垃圾收集器自行处理内存使用情况。但一
关闭连接可以通过取消来自服务器逻辑的传入连接流(例如,将其下游连接到sink.canceled并将其上游连接到source.empty)。还可以通过取消IncomingConnection源连接来关闭服务器的套接字。 但考虑到和会在协商新连接时设置一次,我不清楚如何做到这一点: