当前位置: 首页 > 知识库问答 >
问题:

使用akka-http实现web socker服务器时,堆内存不断增加

颜新
2023-03-14
  def decodeService: Flow[Message, Message, _] = { 
    Flow[Message].map { 
      case BinaryMessage.Strict(encodeMsg) => 
        try {
          WebServer.getWorkerActor ! QueueWork(protoMsg(this, encodeMsg))
        } catch {
          case e: Exception => {
            println("[ERROR] failed to send BinaryMessage.Strict: " + e)
            TextMessage("[ERROR] failed receiving BinaryMessage.Strict")
          }
        }
        TextMessage("[INFO] BinaryMessage.Strict")

      case BinaryMessage.Streamed(streamedMsg) => {
        implicit val system = ActorSystem("DecoderSystem")
        implicit val materializer = ActorMaterializer()
        val streamedMsgFuture: Future[Seq[ByteString]] = streamedMsg.runWith(Sink.seq)
        streamedMsgFuture.onComplete { completedStream =>
          var completeBytestring = new ByteStringBuilder()
          //I'm sure there's a better way to do this.. but hey, it's O(n)
          completedStream.foreach { x => 
            x.foreach { y => 
              completeBytestring ++= y  
            }
          }
          try {
              WebServer.getWorkerActor ! QueueWork(protoMsg(this, completeBytestring.result()))
          } catch {
            case e: Exception => {
              println("[ERROR] failed to send BinaryMessage.Streamed: " + e)
              TextMessage("[ERROR] failed receiving BinaryMessage.Streamed")
            }
          } finally {
            completeBytestring.clear()
          }
        }
        TextMessage("[INFO] BinaryMessage.Streamed")
      }

      case TextMessage.Strict(txt) => TextMessage("Succesfully receive text message")
      case _ => TextMessage("Message type unsupported")
    }
  }

我是否错误地使用了流/接收器/源?如何冲洗溪流?

谢谢

共有1个答案

颜修明
2023-03-14

好吧,最突出的问题是,您为接收到的每个流消息创建一个全新的actorSystemActorSystem就像您的参与者的线程池;您希望尽可能少地创建它们,理想情况下,整个应用程序只创建一个。您不仅为每个消息创建它们,而且不关闭它们--在ActorSystem中配置的所有分派程序,以及它所持有的所有资源都将永远挂起。当然,如果您接收到大量的流式消息,您的内存使用量将会增加。

由于您使用了akka-http,因此您必须有一个ActorSystem,在此调用http().bind*。您需要在decodeservice方法中使其可访问。此外,您计算组合字节流的方式在我看来似乎过于复杂。考虑这样写:

def decodeService: Flow[Message, Message, _] = Flow[Message].mapAsync(4) {
  case m: BinaryMessage.Strict =>
    Future.successful(m)
  case BinaryMessage.Streamed(streamMsg) =>
    streamMsg.runReduce(_ ++ _).map(BinaryMessage.Strict)
  case m =>
    Future.successful(m)
}.map {
  case BinaryMessage.Strict(encodeMsg) =>
    try {
      WebServer.getWorkerActor ! QueueWork(protoMsg(this, encodeMsg))
      TextMessage("[INFO] BinaryMessage.Strict")
    } catch {
      case NonFatal(e) =>
        println("[ERROR] failed to send BinaryMessage.Strict: " + e)
        TextMessage("[ERROR] failed receiving BinaryMessage.Strict")
    }
  case TextMessage.Strict(txt) => TextMessage("Succesfully receive text message")
  case _ => TextMessage("Message type unsupported")
}

在这里,我们首先将所有二进制消息转换为binarymessage.strict,然后像在原始代码中一样处理它们。注意,您必须在try块内写入确认消息,否则即使出现异常,也会返回成功消息。此外,如果您决定根本不处理文本消息,代码可能会变得更加简单:

def decodeService: Flow[Message, Message, _] = Flow[Message]
  .filterNot(_.isText)
  .mapAsync(4) {
    case BinaryMessage.Strict(binary) =>
      Future.successful(binary)
    case BinaryMessage.Stream(binaryStream) =>
      binaryStream.runReduce(_ ++ _)
  .map { encodeMsg =>
    try {
      WebServer.getWorkerActor ! QueueWork(protoMsg(this, encodeMsg))
      TextMessage("[INFO] BinaryMessage.Strict")
    } catch {
      case NonFatal(e) =>
        println("[ERROR] failed to send BinaryMessage.Strict: " + e)
        TextMessage("[ERROR] failed receiving BinaryMessage.Strict")
    }
  }
 类似资料:
  • 编辑:我已经更新了日志片段,以显示dispatcher线程数超出了我的预期。 编辑#2:以下是健康检查路由代码:

  • 我有一个基于AKKA-HTTP的服务,它是用Scala编写的。此服务用作API调用的代理。它使用https://doc.akka.io/docs/akka-http/current/client-side/host-level.html创建一个用于调用API的主机连接池 我想了解这种“之”字形模式的原因,即使服务上没有流量,并且主机池中的连接由于空闲超时而终止。 此外,我还想知道,是否只有在达到一

  • 这是我认为我应该用于这种方法的布局:并且为了适应404路由,还可以使用。现在,如果我的Akka流知识对我有用的话,我需要使用来处理这样的事情,然而,这就是我被困住的地方。 在中,我可以为不同的endpoint进行简单的映射和flatMap,但在流中,这意味着将流划分为多个流,我不太确定该如何进行。我想过使用UnzipWith和Options或通用广播。 如能在这方面提供任何帮助,将不胜感激。 如果

  • 我已经将我的微服务部署到AWS服务器上的docker容器中,该容器使用Akka-HTTP(https://github.com/theiterators/akka-http-microservice)和Scala编写。一旦我将服务部署到AWS服务器上,我就会面临内存泄漏问题和性能问题。 如果它正确地清除了未使用的内存/资源,我们就可以避免这种情况。JVM应该使用垃圾收集器自行处理内存使用情况。但一

  • 关闭连接可以通过取消来自服务器逻辑的传入连接流(例如,将其下游连接到sink.canceled并将其上游连接到source.empty)。还可以通过取消IncomingConnection源连接来关闭服务器的套接字。 但考虑到和会在协商新连接时设置一次,我不清楚如何做到这一点: