我正在尝试在Scala上实现AKKA http和一些Actor。我用Akka创建了一个web应用程序。但我在一个或两个不同的路由上有这个错误/16。(显然是随机的):
服务器无法及时响应您的请求。请稍后再试!
你能给我解释一下为什么和怎么修吗?我对Akka真的是新手。
主类:
object WebServer extends App {
implicit val system = ActorSystem("app-1")
implicit val materializer = ActorMaterializer()
// needed for the future flatMap/onComplete in the end
implicit val executionContext = system.dispatcher
val routes = SessionRoute.route
val bindingFuture = Http().bindAndHandle(routes, ipServer, configApplication.getInt("spray.can.client.proxy.http.port"))
println("serv http launch")
StdIn.readLine
bindingFuture
.flatMap(_.unbind()) // trigger unbinding from the port
.onComplete(_ => {
cluster.close()
system.terminate()
})
bindingFuture.onFailure {
case ex: Exception =>
println(ex, "Failed to bind to {}:{}!", ipServer, configApplication.getInt("spray.can.client.proxy.http.port"))
}
}
object SessionRoute extends TokenValidator {
implicit val formats = org.json4s.DefaultFormats
val sessionHandler = WebServer.system.actorOf(SessionHandler.props(), "SessionHandler")
implicit val timeout = Timeout(60.seconds)
val route: Route = get {
authenticated(doAuthPublisher) { app =>
getActiveUserPublisher
}
}
def getActiveUserPublisher =
path("session" / JavaUUID / "active_user") { publisher_id =>
parameters('date_start.as[Long], 'date_end.as[Long]) {
(date_start, date_end) => {
onSuccess(sessionHandler ? SessionActiveUser(SessionRequest(publisher_id, date_start, date_end, null))) {
case response: Answer =>
complete(StatusCodes.OK, response.result)
case _ =>
complete(StatusCodes.InternalServerError, "Error on the page")
}
}
}
}
}
object SessionHandler {
def props(): Props = {
Props(classOf[SessionHandler])
}
}
class SessionService(implicit actorSystem: ActorSystem) extends toolService {
def activeUser(sessionRequest: SessionRequest): Map[String, Any] = {
....
}
}
class SessionHandler extends Actor with ActorLogging {
implicit val system = ActorSystem("session")
implicit val formats = org.json4s.DefaultFormats
def receive: Receive = {
case request: SessionActiveUser =>
sender() ! Answer(Serialization.write(new SessionService().activeUser(request.sessionRequest)))
}}
final case class Answer(result: String)
case class SessionActiveUser(sessionRequest: SessionRequest)
case class SessionRequest(publisher_id: UUID = null, date_start: Long, date_end: Long, app_id: String = null)
akka {
loglevel = INFO
stdout-loglevel = INFO
loggers = ["akka.event.slf4j.Slf4jLogger"]
default-dispatcher {
fork-join-executor {
parallelism-min = 8
}
}
// event-handlers = ["akka.event.slf4j.Slf4jLogger"]
}
您看到的错误是由于路由
无法在配置的请求超时内生成响应造成的。如果您还没有显式设置,则默认为20秒。有关请求超时的更多信息,请参阅此处。
关于发生这种情况的原因,您能详细说明activeuser
函数中发生了什么吗?那里有什么重大的阻塞吗?如果是这样,您的所有传入请求都将按顺序排列,并根据activeuser
进行阻止,最终导致请求超时终止您的请求。
可能的解决办法有:
对于我正在构建的新akka应用程序,我有一个设计挑战。问题/挑战是:在客户端,我制作了一个简单的actor,它发送一个请求,然后使用been()来等待正确的服务器应答,当然还包括一条超时消息,以防我在正确的时间内没有得到应答。然而,有趣的是在服务器端。这里我有以下结构: 参与者A(配置为循环路由器)此路由器正在接收来自客户端的所有请求。 参与者A然后将消息转发给参与者A1、A2。。。Ax都是在演员
例如,我有两个演员——一个家长演员和一个孩子演员。当父级收到消息时,它会产生消息中指定的尽可能多的子角色。如何测试此功能?有没有一种方法可以模拟上下文,或者其他一些方法来检查参与者的创建是否正确和数量是否正确? 更新:基于@Tim answer的解决方案 更改类别: 测试:
参与者可以使用API或命令行添加到参与者库中。 在你开始之前 在你执行这些步骤之前,你在业务网络定义中必须建模一个参与者,并将其部署为业务网络。 下面的过程显示了一个使用以下数字财产范例业务网络定义的参与者模型的示例:digitalproperty-network 请注意:如果你使用composer participant add命令添加参与者,请确保参与者的JSON陈述包裹在单引号中。 name
我有一个AKKA模型,有一个主管演员,他创造了许多儿童演员。子参与者将处理一个事件并将消息发送到另一个服务(例如Kafka topic)。 目前,我有一个静态共享类,它在子参与者之间共享以发送消息,但在参与者模型中,我认为最好使用参与者来实现这一目的。 我想知道我如何才能创造一个演员,儿童演员可以分享它。如果supervisor actor创建了MessagePublisher actor,那么孩
composer participant add命令将参与者的新实例添加到参与者库中。查看“ 添加参与者 ”任务,了解使用此命令或API的演练。 data选项必须包含一个代表要添加的参与者的序列化JSON串,并且必须用单引号包裹。 句法 composer participant add composer participant add [options] Participant options
概念 A Participant(参与者)是业务网络中的行为人(actor)。参与者可能是一个组织的分支。参与者可以创建资产,并与其他参与者交换资产。参与者通过提交交易来处理资产。 参与者拥有一组Identity文档,可以用来证明参与者的身份。例如,一个人可能有一个或多个以下身份文档证明他们是谁: 护照 驾驶执照 指纹 视网膜扫描 SSL证书 在Hyperledger Composer中,参与者可