我目前有一个简单的TextMessage Source,它向我的Websocket客户端流发送消息,如下所示:
val incoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println(message.text)
case _ =>
}
// send this as a message over the WebSocket
val outgoing: Source[TextMessage.Strict, NotUsed] = Source
.combine(
Source.single(
TextMessage(
"""{"auth":"APIKEY-123"}"""
)
),
Source.single(
TextMessage(
"""{"topic":"topic123"}"""
)
),
Source.never
)(Merge(_))
.throttle(1, 1.second)
val webSocketFlow = Http().webSocketClientFlow(
WebSocketRequest("wss://socket.polygon.io/stocks")
)
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(
Keep.right
) // keep the materialized Future[WebSocketUpgradeResponse]
.toMat(incoming)(Keep.both) // also keep the Future[Done]
.run()
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(
s"Connection failed: ${upgrade.response.status}"
)
}
}
因此,我目前有一个Source类型的源代码[TextMessage.Strict,NotUsed],但我想使用注释掉的代码,其中我有一个ActorRef作为我的源代码。
我试过这个:
val actorSource: Source[Any, ActorRef] = Source.actorRef(
completionMatcher = { case Done =>
CompletionStrategy.immediately
},
failureMatcher = PartialFunction.empty,
bufferSize = 100,
overflowStrategy = OverflowStrategy.dropHead
)
val actorRef: ActorRef = actorSource.to(Sink.foreach(println)).run()
actorRef ! """{"auth":"APIKEY-123"}"""
val webSocketFlow = Http().webSocketClientFlow(
WebSocketRequest("wss://socket.polygon.io/stocks")
)
val (upgradeResponse, closed) =
actorSource
.viaMat(webSocketFlow)(
Keep.right
) // keep the materialized Future[WebSocketUpgradeResponse]
.toMat(incoming)(Keep.both) // also keep the Future[Done]
.run()
因此,当我使用ActorRef作为我的源时,我很难尝试将其放入图中。我得到了这个编译时错误:
类型不匹配;[错误]找到:akka。流动scaladsl。Flow[akka.http.scaladsl.model.ws.Message,akka.http.scaladsl.model.ws.Message,scala.concurrent.Future[akka.http.scaladsl.model.ws.WebSocketUpgradeResponse][error]必需:akka。流动图[akka.stream.FlowShape[String,?],?][错误]
。viaMat(webSocketFlow)(
注:我希望一个参与者作为我的源,也作为我的接收器,即将流产生的所有消息作为接收器传递给另一个参与者。
有人能解释一下我目前在将Actor作为源并试图将其添加到我的流/图中时犯了什么错误吗?
更新
这是我现在拥有的代码:
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem()
import system.dispatcher
val incoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println(message.text)
case _ =>
// ignore other message types
}
val actorSource = Source.actorRef[String](
completionMatcher = { case Done =>
CompletionStrategy.immediately
},
failureMatcher = PartialFunction.empty,
bufferSize = 100,
overflowStrategy = OverflowStrategy.dropHead
)
val webSocketFlow = Http().webSocketClientFlow(
WebSocketRequest("wss://socket.polygon.io/stocks")
)
// the materialized value is a tuple with
// upgradeResponse is a Future[WebSocketUpgradeResponse] that
// completes or fails when the connection succeeds or fails
// and closed is a Future[Done] with the stream completion from the incoming sink
val ((sendActor, upgradeResponse), closed) =
actorSource
.viaMat(webSocketFlow)(
Keep.both
) // keep the materialized Future[WebSocketUpgradeResponse]
.toMat(incoming)(Keep.both) // also keep the Future[Done]
.run()
// just like a regular http request we can access response status which is available via upgrade.response.status
// status code 101 (Switching Protocols) indicates that server support WebSockets
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(
s"Connection failed: ${upgrade.response.status}"
)
}
}
sendActor ! TextMessage("""{"auth":"APIKEY-123"}""")
sendActor ! TextMessage("""{"topic":"topic123"}""")
//in a real application you would not side effect here
connected.onComplete(println)
closed.foreach(_ => println("closed"))
}
我收到以下编译错误:
[错误]匿名函数的参数类型必须完全已知。(SLS 8.5)[错误]预期类型为:?[错误]
完成匹配={case Done=
您的编译器错误是由您的actorSource
没有输出消息,而是
String
引起的(该错误不应该是您在代码中遇到的错误,也许您尝试将其更改为源[String,ActorRef]
?):由于webSocketFlow
仅处理消息
s,因此它只能附加到消息
的源。
所以我建议如下:
val immediateCompletion: PartialFunction[Any, CompletionStrategy] = {
case Done => CompletionStrategy.immediately
}
val actorSource = Source.actorRef[Message](
completionMatcher = immediateCompletion,
failureMatcher = PartialFunction.empty,
bufferSize = 100,
overflowStrategy = OverflowStrategy.dropHead
)
val webSocketFlow = Http().webSocketClientFlow(
WebSocketRequest("wss://socket.polygon.io/stocks")
)
val ((sendActor, upgradeResponse), closed) =
actorSource
.viaMat(webSocketFlow)(Keep.both) // keep both the actor and the upgradeResponse
.toMat(incoming)(Keep.both) // ...and also keep the closed
.run()
sendActor ! TextMessage("""{"auth":"APIKEY-123"}""")
sendActor ! TextMessage("""{"topic":"topic123"}""")
我过去成功地使用过Akka Streams,但是,我现在很难理解为什么Akka HTTP中定义了客户端Websocket流,并按照文档中显示的方式工作。 由于WebSocket连接允许全双工通信,我希望这样的连接在Akka HTTP中由两个独立的流表示,一个用于传入流量,一个用于传出流量。事实上,文件说明了以下内容: WebSocket由两个消息流组成[…] 它还指出,传入消息由表示,传出消息由表
我已经设置了一个WebSocket服务器使用超文本传输协议-kit,应该接受Web套接字连接。它是超文本传输协议-kit留档中显示的基本示例。 问题是:如何创建连接到它的Clojure客户端? 客户端可以是任何Clojure超文本传输协议库,我真的不介意。我已经了解了Javascript客户端,我相信Java有一个可以从Clojure中使用的API。但是我要找的是一个Clojure库,它支持客户端
我有点困惑如何创建超文本传输协议-kit WebSocket客户端。在网站上有一个如何创建WebSocket服务器的例子,创建客户端可能是微不足道的,但我似乎不能得到它的权利。有没有不关注Javascript客户端的例子? P. S.在这个问题中进行了一些讨论,但仅举了一些例子,并指示超文本传输协议-kit现在支持WebSocket客户端。
Hyperf 提供了对 WebSocket Client 的封装,可基于 hyperf/websocket-client 组件对 WebSocket Server 进行访问; 安装 composer require hyperf/websocket-client 使用 组件提供了一个 Hyperf\WebSocketClient\ClientFactory 来创建客户端对象 Hyperf\Web
我试图使用Python Twisted Authobhan websocket客户端打开客户端(每台机器有60K端口限制)的并发websocket连接。但是我无法使用下面的代码打开不超过20K的连接: 我在一个循环中使用了“reactor.connecttcp”,使用Twisted打开并发websocket连接是否正确? 让我知道。
Git 为开发者提供了如此优秀的体验,许多人已经找到了在他们的工作站上使用 Git 的方法,即使他们团队其余的人使用的是完全不同的 VCS。 有许多这种可用的适配器,它们被叫做 “桥接”。 下面我们将要介绍几个很可能会在实际中用到的桥接。 Git 与 Subversion 很大一部分开源项目与相当多的企业项目使用 Subversion 来管理它们的源代码。 而且在大多数时间里,它已经是开源项目VC