当前位置: 首页 > 知识库问答 >
问题:

使用参与者作为Websocket客户端流的源

王飞虎
2023-03-14

我目前有一个简单的TextMessage Source,它向我的Websocket客户端流发送消息,如下所示:

     val incoming: Sink[Message, Future[Done]] =
      Sink.foreach[Message] {
        case message: TextMessage.Strict =>
          println(message.text)
        case _ =>
      }

    // send this as a message over the WebSocket
    val outgoing: Source[TextMessage.Strict, NotUsed] = Source
      .combine(
        Source.single(
          TextMessage(
            """{"auth":"APIKEY-123"}"""
          )
        ),
        Source.single(
          TextMessage(
            """{"topic":"topic123"}"""
          )
        ),
        Source.never
      )(Merge(_))
      .throttle(1, 1.second)

    val webSocketFlow = Http().webSocketClientFlow(
      WebSocketRequest("wss://socket.polygon.io/stocks")
    )

    val (upgradeResponse, closed) =
      outgoing
        .viaMat(webSocketFlow)(
          Keep.right
        ) // keep the materialized Future[WebSocketUpgradeResponse]
        .toMat(incoming)(Keep.both) // also keep the Future[Done]
        .run()

    val connected = upgradeResponse.flatMap { upgrade =>
      if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
        Future.successful(Done)
      } else {
        throw new RuntimeException(
          s"Connection failed: ${upgrade.response.status}"
        )
      }
    }

因此,我目前有一个Source类型的源代码[TextMessage.Strict,NotUsed],但我想使用注释掉的代码,其中我有一个ActorRef作为我的源代码。

我试过这个:

  val actorSource: Source[Any, ActorRef] = Source.actorRef(
  completionMatcher = { case Done =>
    CompletionStrategy.immediately
  },
  failureMatcher = PartialFunction.empty,
  bufferSize = 100,
  overflowStrategy = OverflowStrategy.dropHead
)

val actorRef: ActorRef = actorSource.to(Sink.foreach(println)).run()
actorRef !  """{"auth":"APIKEY-123"}"""

val webSocketFlow = Http().webSocketClientFlow(
  WebSocketRequest("wss://socket.polygon.io/stocks")
)

val (upgradeResponse, closed) =
  actorSource
    .viaMat(webSocketFlow)(
      Keep.right
    ) // keep the materialized Future[WebSocketUpgradeResponse]
    .toMat(incoming)(Keep.both) // also keep the Future[Done]
    .run()

因此,当我使用ActorRef作为我的源时,我很难尝试将其放入图中。我得到了这个编译时错误:

类型不匹配;[错误]找到:akka。流动scaladsl。Flow[akka.http.scaladsl.model.ws.Message,akka.http.scaladsl.model.ws.Message,scala.concurrent.Future[akka.http.scaladsl.model.ws.WebSocketUpgradeResponse][error]必需:akka。流动图[akka.stream.FlowShape[String,?],?][错误]
。viaMat(webSocketFlow)(

:我希望一个参与者作为我的源,也作为我的接收器,即将流产生的所有消息作为接收器传递给另一个参与者。

有人能解释一下我目前在将Actor作为源并试图将其添加到我的流/图中时犯了什么错误吗?

更新

这是我现在拥有的代码:

def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    import system.dispatcher

    val incoming: Sink[Message, Future[Done]] =
      Sink.foreach[Message] {
        case message: TextMessage.Strict =>
          println(message.text)
        case _ =>
        // ignore other message types
      }

    val actorSource = Source.actorRef[String](
      completionMatcher = { case Done =>
        CompletionStrategy.immediately
      },
      failureMatcher = PartialFunction.empty,
      bufferSize = 100,
      overflowStrategy = OverflowStrategy.dropHead
    )

    val webSocketFlow = Http().webSocketClientFlow(
      WebSocketRequest("wss://socket.polygon.io/stocks")
    )

    // the materialized value is a tuple with
    // upgradeResponse is a Future[WebSocketUpgradeResponse] that
    // completes or fails when the connection succeeds or fails
    // and closed is a Future[Done] with the stream completion from the incoming sink
    val ((sendActor, upgradeResponse), closed) =
      actorSource
        .viaMat(webSocketFlow)(
          Keep.both
        ) // keep the materialized Future[WebSocketUpgradeResponse]
        .toMat(incoming)(Keep.both) // also keep the Future[Done]
        .run()

    // just like a regular http request we can access response status which is available via upgrade.response.status
    // status code 101 (Switching Protocols) indicates that server support WebSockets
    val connected = upgradeResponse.flatMap { upgrade =>
      if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
        Future.successful(Done)
      } else {
        throw new RuntimeException(
          s"Connection failed: ${upgrade.response.status}"
        )
      }
    }

    sendActor ! TextMessage("""{"auth":"APIKEY-123"}""")
    sendActor ! TextMessage("""{"topic":"topic123"}""")

    //in a real application you would not side effect here
    connected.onComplete(println)
    closed.foreach(_ => println("closed"))
  }

我收到以下编译错误:

[错误]匿名函数的参数类型必须完全已知。(SLS 8.5)[错误]预期类型为:?[错误]
完成匹配={case Done=

共有1个答案

贺立果
2023-03-14

您的编译器错误是由您的actorSource没有输出消息,而是String引起的(该错误不应该是您在代码中遇到的错误,也许您尝试将其更改为源[String,ActorRef]?):由于webSocketFlow仅处理消息s,因此它只能附加到消息的源。

所以我建议如下:

val immediateCompletion: PartialFunction[Any, CompletionStrategy] = {
  case Done => CompletionStrategy.immediately
}

val actorSource = Source.actorRef[Message](
  completionMatcher = immediateCompletion,
  failureMatcher = PartialFunction.empty,
  bufferSize = 100,
  overflowStrategy = OverflowStrategy.dropHead
)

val webSocketFlow = Http().webSocketClientFlow(
  WebSocketRequest("wss://socket.polygon.io/stocks")
)

val ((sendActor, upgradeResponse), closed) =
  actorSource
    .viaMat(webSocketFlow)(Keep.both)  // keep both the actor and the upgradeResponse
    .toMat(incoming)(Keep.both)  // ...and also keep the closed
    .run()

sendActor ! TextMessage("""{"auth":"APIKEY-123"}""")
sendActor ! TextMessage("""{"topic":"topic123"}""")

 类似资料:
  • 我过去成功地使用过Akka Streams,但是,我现在很难理解为什么Akka HTTP中定义了客户端Websocket流,并按照文档中显示的方式工作。 由于WebSocket连接允许全双工通信,我希望这样的连接在Akka HTTP中由两个独立的流表示,一个用于传入流量,一个用于传出流量。事实上,文件说明了以下内容: WebSocket由两个消息流组成[…] 它还指出,传入消息由表示,传出消息由表

  • 我已经设置了一个WebSocket服务器使用超文本传输协议-kit,应该接受Web套接字连接。它是超文本传输协议-kit留档中显示的基本示例。 问题是:如何创建连接到它的Clojure客户端? 客户端可以是任何Clojure超文本传输协议库,我真的不介意。我已经了解了Javascript客户端,我相信Java有一个可以从Clojure中使用的API。但是我要找的是一个Clojure库,它支持客户端

  • 我有点困惑如何创建超文本传输协议-kit WebSocket客户端。在网站上有一个如何创建WebSocket服务器的例子,创建客户端可能是微不足道的,但我似乎不能得到它的权利。有没有不关注Javascript客户端的例子? P. S.在这个问题中进行了一些讨论,但仅举了一些例子,并指示超文本传输协议-kit现在支持WebSocket客户端。

  • Hyperf 提供了对 WebSocket Client 的封装,可基于 hyperf/websocket-client 组件对 WebSocket Server 进行访问; 安装 composer require hyperf/websocket-client 使用 组件提供了一个 Hyperf\WebSocketClient\ClientFactory 来创建客户端对象 Hyperf\Web

  • 我试图使用Python Twisted Authobhan websocket客户端打开客户端(每台机器有60K端口限制)的并发websocket连接。但是我无法使用下面的代码打开不超过20K的连接: 我在一个循环中使用了“reactor.connecttcp”,使用Twisted打开并发websocket连接是否正确? 让我知道。

  • Git 为开发者提供了如此优秀的体验,许多人已经找到了在他们的工作站上使用 Git 的方法,即使他们团队其余的人使用的是完全不同的 VCS。 有许多这种可用的适配器,它们被叫做 “桥接”。 下面我们将要介绍几个很可能会在实际中用到的桥接。 Git 与 Subversion 很大一部分开源项目与相当多的企业项目使用 Subversion 来管理它们的源代码。 而且在大多数时间里,它已经是开源项目VC