当前位置: 首页 > 知识库问答 >
问题:

Akka HTTP客户端websocket流的定义

程枫
2023-03-14

我过去成功地使用过Akka Streams,但是,我现在很难理解为什么Akka HTTP中定义了客户端Websocket流,并按照文档中显示的方式工作。

由于WebSocket连接允许全双工通信,我希望这样的连接在Akka HTTP中由两个独立的流表示,一个用于传入流量,一个用于传出流量。事实上,文件说明了以下内容:

WebSocket由两个消息流组成[…]

它还指出,传入消息由接收器表示,传出消息由表示。这是我的第一个困惑点——当使用两个独立的流时,你可能需要总共处理两个源和两个汇,而不是每种类型只处理一个。目前,我的猜测是,传入流的源和传出流的汇对开发人员来说并没有多大用处,因此只是“隐藏”。

然而,当把所有东西连接在一起时,它真的会让人困惑(参见上面链接的文档)。

使用singleWebSocketRequest时出现问题的部分:

val flow: Flow[Message, Message, Future[Done]] =
      Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left)

或者在使用webSocketClientFlow时使用相同的部分:

val (upgradeResponse, closed) =
      outgoing
        .viaMat(webSocketFlow)(Keep.right)
        .toMat(incoming)(Keep.both)
        .run()

这与我目前对工作流的理解相矛盾。

  • 为什么我要将传出消息的与传入消息的接收器结合起来?上面的代码看起来像是我在向自己发送消息,而不是向服务器发送消息

任何帮助,以提高我的理解是赞赏的,谢谢。

编辑:

我使用SourceSink并通过WebSocket发送数据没有问题,我只想了解为什么阶段的布线是这样完成的。

共有1个答案

金钊
2023-03-14

WebSocket确实由两个独立的流组成,只是这些流(可能)不在同一个JVM上。

您有两个对等点进行通信,一个是服务器,另一个是客户端,但从已建立的WebSocket连接来看,差异不再重要。一个数据流是对等点1向对等点2发送消息,另一个流是对等点2向对等点1发送消息,然后这两个对等点之间存在网络边界。如果您一次看一个对等点,您有对等点1从对等点2接收消息,而在第二个流中对等点1正在向对等点2发送消息。

每个对等点都有一个接收部分的接收器和一个发送部分的源。实际上,您总共有两个源和两个接收器,只是不都在同一个ActorSystem上(为了解释起见,假设两个对等点都是在Akka HTTP中实现的)。来自对等方1的源连接到对等方2的接收器,对等方2的源连接到对等方1的接收器。

因此,您编写了一个Sink来描述如何处理通过第一个流的传入消息,以及一个Source来描述如何通过第二个流发送消息。通常,您希望根据您正在接收的消息生成消息,因此您可以将这两者连接在一起并通过不同的流路由消息,这些流描述了如何响应和传入消息并生成任意数量的传出消息。Flow[Message, Message,_]并不意味着您正在将传出消息转换为传入消息,而是将传入消息转换为传出消息。

webSocketFlow是一个典型的异步边界,代表另一个对等方的流。它将传出的消息“转换”为传入的消息,方法是将它们发送给另一个对等方,并发出另一个对等方发送的任何消息。

val flow: Flow[Message, Message, Future[Done]] =
      Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left)

此流是对等方的两个流的一半:

  1. [来自其他对等方的消息]连接到printSink
  2. helloSource已连接到[发送给其他对等方的消息]

传入消息和传出消息之间没有关系,您只需打印接收到的所有消息并发送一个“hello world!”。实际上,由于源代码在一条消息之后完成,WebSocket连接也会关闭,但是如果您将源代码替换为例如source。重复,你会不断地发送(洪水,真的)“你好,世界!”无论传入消息的速率如何,都可以通过网络传输。

val (upgradeResponse, closed) =
      outgoing
        .viaMat(webSocketFlow)(Keep.right)
        .toMat(incoming)(Keep.both)
        .run()

在这里,您将从传出的中获取所有信息,这是您想要发送的消息,通过webSocketFlow将其路由,webSocketFlow通过与其他对等方通信来“转换”消息,并将每个接收到的消息生成为传入的。通常,您有一个wire协议,在该协议中,您可以将case class/pojo/dto消息编码和解码为wire格式。

val encode: Flow[T, Message, _] = ???
val decode: Flow[Message, T, _] = ???

val upgradeResponse = outgoing
  .via(encode)
  .viaMat(webSocketFlow)(Keep.right)
  .via(decode)
  .to(incoming)
  .run()

或者你可以想象某种聊天服务器(啊,websockets和chats),它可以广播和合并来自和到多个客户端的消息。这应该从任何客户端获取任何消息,并将其发送到每个客户端(仅用于演示,未经测试,可能不是您想要的实际聊天服务器):

val chatClientReceivers: Seq[Sink[Message, NotUsed]] = ???
val chatClientSenders: Seq[Source[Message, NotUsed]] = ???

// each of those receivers/senders could be paired in their own websocket compatible flow
val chatSockets: Seq[Flow[Message, Message, NotUsed]] =
  (chatClientReceivers, chatClientSenders).zipped.map(
    (outgoingSendToClient, incomingFromClient) =>
      Flow.fromSinkAndSource(outgoingSendToClient, incomingFromClient))

val toClients: Graph[SinkShape[Message], NotUsed] =
  GraphDSL.create() {implicit b =>
    import GraphDSL.Implicits._

    val broadcast = b.add(Broadcast[Message](chatClientReceivers.size))

    (broadcast.outArray, chatClientReceivers).zipped
      .foreach((bcOut, client) => bcOut ~> b.add(client).in)

    SinkShape(broadcast.in)
  }

val fromClients: Graph[SourceShape[Message], NotUsed] =
  GraphDSL.create() {implicit b =>
    import GraphDSL.Implicits._

    val merge = b.add(Merge[Message](chatClientSenders.size))

    (merge.inSeq, chatClientSenders).zipped
      .foreach((mIn, client) => b.add(client).out ~> mIn)

    SourceShape(merge.out)
  }

val upgradeResponse: Future[WebSocketUpgradeResponse] =
  Source.fromGraph(fromClients)
    .viaMat(webSocketFlow)(Keep.right)
    .to(toClients)
    .run()

希望这能有所帮助。

 类似资料:
  • 我已经设置了一个WebSocket服务器使用超文本传输协议-kit,应该接受Web套接字连接。它是超文本传输协议-kit留档中显示的基本示例。 问题是:如何创建连接到它的Clojure客户端? 客户端可以是任何Clojure超文本传输协议库,我真的不介意。我已经了解了Javascript客户端,我相信Java有一个可以从Clojure中使用的API。但是我要找的是一个Clojure库,它支持客户端

  • 我有点困惑如何创建超文本传输协议-kit WebSocket客户端。在网站上有一个如何创建WebSocket服务器的例子,创建客户端可能是微不足道的,但我似乎不能得到它的权利。有没有不关注Javascript客户端的例子? P. S.在这个问题中进行了一些讨论,但仅举了一些例子,并指示超文本传输协议-kit现在支持WebSocket客户端。

  • Hyperf 提供了对 WebSocket Client 的封装,可基于 hyperf/websocket-client 组件对 WebSocket Server 进行访问; 安装 composer require hyperf/websocket-client 使用 组件提供了一个 Hyperf\WebSocketClient\ClientFactory 来创建客户端对象 Hyperf\Web

  • WebSocketClient 结构体指针上只有如下两个可设置的字段: Header 字段 用来设置自定义的 HTTP 头信息。 MaxConcurrentRequests 方法 该方法返回最大并发请求数。 SetMaxConcurrentRequests 方法 该方法用于设置最大并发请求数,当并发请求超过该设置之后,后面的请求将会排队等待。该设置默认值为 10。

  • 我正在努力与Netty 4.0.8网络套接字客户端示例和SSL,我似乎无法将数据发送到Netty SSL网络套接字服务器示例。尽管围绕这个问题已经有很多帖子了,(我相信我已经浏览了所有的帖子),最常见的建议是在管道的开头添加一个sslHandler,但它不起作用。握手似乎是成功的,因为它也表明了一个相关的问题在这里。 我记得4.0版的情况也一样。0,但我还是设法让它工作了。然而,当我升级到4.0时

  • 协程版Http客户端的底层用纯C编写,不依赖任何第三方扩展库,拥有超高的性能。 支持Http-Chunk、Keep-Alive特性,支持form-data格式 Http协议版本为HTTP/1.1 支持升级为WebSocket客户端 gzip压缩格式支持需要依赖zlib库 客户端仅实现核心的功能,实际项目建议使用 Saber 属性 errCode 错误状态码。当connect/send/recv/c