我正在通过在WebSocketClientFlow上遵循doc来尝试客户端websocket。
import akka.actor.ActorSystem
import akka.Done
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._
import scala.concurrent.Future
object WebSocketClientFlow {
def main(args: Array[String]) = {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
import system.dispatcher
// Future[Done] is the materialized value of Sink.foreach,
// emitted when the stream completes
val incoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println(message.text)
}
// send this as a message over the WebSocket
val outgoing = Source.single(TextMessage("hello world!"))
// flow to use (note: not re-usable!)
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org"))
// the materialized value is a tuple with
// upgradeResponse is a Future[WebSocketUpgradeResponse] that
// completes or fails when the connection succeeds or fails
// and closed is a Future[Done] with the stream completion from the incoming sink
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
.toMat(incoming)(Keep.both) // also keep the Future[Done]
.run()
// just like a regular http request we can access response status which is available via upgrade.response.status
// status code 101 (Switching Protocols) indicates that server support WebSockets
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
// in a real application you would not side effect here
connected.onComplete(println)
closed.foreach(_ => println("closed"))
}
}
您可以创建一个基于执行元的源,并通过建立的websocket连接发送新消息。
val req = WebSocketRequest(uri = "ws://127.0.0.1/ws")
val webSocketFlow = Http().webSocketClientFlow(req)
val messageSource: Source[Message, ActorRef] =
Source.actorRef[TextMessage.Strict](bufferSize = 10, OverflowStrategy.fail)
val messageSink: Sink[Message, NotUsed] =
Flow[Message]
.map(message => println(s"Received text message: [$message]"))
.to(Sink.ignore)
val ((ws, upgradeResponse), closed) =
messageSource
.viaMat(webSocketFlow)(Keep.both)
.toMat(messageSink)(Keep.both)
.run()
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
ws ! TextMessage.Strict("Hello World")
ws ! TextMessage.Strict("Hi")
ws ! TextMessage.Strict("Yay!")
`
> 构造函数接受InetSocketAddress和actorref。InetSocketAddress是有意义的(我假设这是目的地),但是actorref是什么?这是我第一次使用akka,但据我了解,ActorRef是另一个演员的引用。既然我的TCP客户端是一个参与者,并且我希望这个TCP参与者与TCP服务器通信,而不是与另一个参与者通信,为什么我要给它一个参与者引用? 伙伴对象中的道具功能是用
我尝试将Spring与websocket一起使用。我从本教程开始调查。 在我的侧客户端,我有类似的东西来初始化到服务器的连接: 它工作得很好,在我的控制器中,我可以在下面的类中执行我的过程: 现在我想做的是让一个线程向监听“/主题/问候”的客户端发送消息。我这样写Runnable类: 这样完成了我的控制器: 该方法采用光电控制器。fireGreeting按我的要求调用,但客户端没有发生任何事情。有
我过去成功地使用过Akka Streams,但是,我现在很难理解为什么Akka HTTP中定义了客户端Websocket流,并按照文档中显示的方式工作。 由于WebSocket连接允许全双工通信,我希望这样的连接在Akka HTTP中由两个独立的流表示,一个用于传入流量,一个用于传出流量。事实上,文件说明了以下内容: WebSocket由两个消息流组成[…] 它还指出,传入消息由表示,传出消息由表
问题内容: 我正在使用Java开发RESTful Web服务。如果出现问题,我需要一种将错误消息发送给客户端的好方法。 根据Javadoc的规定, “由于message参数含义不明确 ,因此不建议使用 ”。 有没有设置状态消息或响应的“ 原因短语 ”的首选方法?该方法不这样做。 编辑:为了澄清,我想修改HTTP状态行,即不是主体内容。具体来说,我想发送类似的回复。 问题答案: 我认为任何RESTf
我可以找到许多例子,说明如何让akka-http服务器轻松封送由case类表示的响应实体,只需混合使用SprayJSONSupport/DefaultJSONProtocol,在隐式作用域中提供jsonFormat并在路由DSL中使用complete指令。整齐!
我有一个工作的WebSocket示例,其中客户端从服务器接收消息。 我不确定当客户端连接时,我应该如何向客户端发送旧消息。 示例: 每个客户端在连接时提供其名称 服务器响应“[名称] 刚刚连接”(对所有客户端) 任何新客户端都不会收到这些消息 我想知道客户端是否有任何方法可以接收旧消息(所有消息或过去 5 分钟内的消息都可以接受)。 我怀疑我可能需要自己捕获这些信息,将其存储在某个地方(如数据库)