当前位置: 首页 > 知识库问答 >
问题:

异步终止Akka-Http Web套接字连接

皇甫敏达
2023-03-14
lazy val authSuccessMessage = Source.fromFuture(someApiCall)

lazy val messageFlow = requestResponseFlow
    .merge(updateBroadcastEventSource)

lazy val handler = codec
  .atop(authGate(authSuccessMessage))
  .join(messageFlow)

handleWebSocketMessages {
  handler
}

共有1个答案

束俊英
2023-03-14

找到了如何使用killswitch来完成此操作。

更新版本

旧版本的问题是,当由堆栈中较高的bidiflow阶段(如我的authgate)触发时,它似乎无法工作。我不确定确切的原因,但是将关闭建模为bidiflow本身,放在堆栈的更高位置,解决了这个问题。

val shutoffPromise = Promise[Option[OutgoingWebsocketEvent]]()

/**
 * Shutoff valve for the connection. It is triggered when `shutoffPromise`
 * completes, and sends a final optional termination message if that
 * promise resolves with one.
 */
val shutoffBidi = {
  val terminationMessageSource = Source
    .maybe[OutgoingWebsocketEvent]
    .mapMaterializedValue(_.completeWith(shutoffPromise.future))

  val terminationMessageBidi = BidiFlow.fromFlows(
    Flow[IncomingWebsocketEventOrAuthorize],
    Flow[OutgoingWebsocketEvent].merge(terminationMessageSource)
  )

  val terminator = BidiFlow
    .fromGraph(KillSwitches.singleBidi[IncomingWebsocketEventOrAuthorize, OutgoingWebsocketEvent])
    .mapMaterializedValue { killSwitch =>
      shutoffPromise.future.foreach { _ => println("Shutting down connection"); killSwitch.shutdown() }
    }

  terminationMessageBidi.atop(terminator)
}
val handler = codec
  .atop(shutoffBidi)
  .atop(authGate(authSuccessMessage))
  .join(messageFlow)
val shutoffPromise = Promise[Option[OutgoingWebsocketEvent]]()

/**
 * Shutoff valve for the flow of outgoing messages. It is triggered when
 * `shutoffPromise` completes, and sends a final optional termination
 * message if that promise resolves with one.
 */
val shutoffFlow = {
  val terminationMessageSource = Source
    .maybe[OutgoingWebsocketEvent]
    .mapMaterializedValue(_.completeWith(shutoffPromise.future))

  Flow
    .fromGraph(KillSwitches.single[OutgoingWebsocketEvent])
    .mapMaterializedValue { killSwitch =>
      shutoffPromise.future.foreach(_ => killSwitch.shutdown())
    }
    .merge(terminationMessageSource)
}
val handler = codec
  .atop(authGate(authSuccessMessage))
  .join(messageFlow via shutoffFlow)
 类似资料:
  • 一些背景: 我有一个使用3G或4G数据连接的Android应用程序。但它也连接到一个wifi热点,以便在热点设备和应用程序之间传输一些数据。 我想做的是:创建一个到这个wifi热点的套接字连接,并通过这个套接字发送/接收数据。 我添加了以下代码,以确保我们在创建套接字时使用wifi热点wifi(否则它有时最终会使用数据连接): 这似乎没问题。但是,我在尝试连接到远程服务器地址时发现问题。下面是我正

  • 开始使用akka-streams,我想构建一个简单的示例。在chrome中,使用web套接字插件,我可以通过并发送2个命令,简单地连接到这样的流https://blockchain.info/api/apiwebsocket 将在chromes web socket插件窗口中传输结果。 我试图在akka流中实现相同的功能,但面临一些问题: 执行了2个命令,但我实际上没有获得流输出 同一命令执行两次

  • 问题内容: 我正在尝试使用H2OR内的库在具有7000万行和25个数字特征的训练集上使用随机森林分类模型。总文件大小为5.6 GB。 验证文件的大小为1 GB。 我的系统上有16 GB RAM和8核CPU。 系统成功读取了H2O对象中的两个文件。 然后,我给出以下命令来构建模型: 但是几分钟(不生成任何树)后,出现以下错误: “ .h2o.doSafeREST中的错误(conn = conn,h2

  • 问题内容: 我正在用MySQL建立一个网站。我将TOAD用于MySQL,突然出现错误,无法连接数据库: “太多联系” Toad for MySQL中有什么方法可以查看现有连接以杀死它们或简单地将所有连接全部关闭? 问题答案: 不, 没有内置的MySQL命令 。有多种工具和脚本支持它,您可以手动终止某些连接或重新启动服务器(但这会比较慢)。 使用查看所有连接,进程ID是你想杀死。 您可以编辑超时设置

  • 问题内容: 是否可以在不关闭整个服务器的情况下终止服务器的Websocket连接?如果是这样,我该如何实现呢? 注意:我使用NodeJS作为后端和’ws’websocket模块。 问题答案: 如果要踢所有客户端而不关闭服务器,则可以执行以下操作: 如果您要特别寻找一个,也可以进行过滤。如果您要踢客户端作为连接逻辑的一部分(即,它发送错误的数据等),则可以执行以下操作: 和一个基本的客户 你会得到:

  • 我有一个应用程序,其中DB连接由Spring配置创建(然后传递到hibernate)。以下是spring文件中的默认参数。 然而,在特定的环境中,Oracle DB连接会在一段时间后被网络设置关闭。我们最初认为关闭连接是因为连接处于空闲状态。我们添加了validationQuery,以便定期在服务器上发布查询,这样连接就不会空闲。 即使在上述设置之后,连接仍然会关闭 还观察到,当应用程序空闲2-3