我有一个restful API,它接收一组JSON消息,这些消息将被转换为单独的Avro消息,然后发送到Kafka。在路由中,我调用3个不同的actor:1)一个actor出去,从磁盘中检索Avro模式;2)然后循环访问JSON消息数组,并将其与第二个actor中的Avro模式进行比较。如果任何消息没有验证,那么我需要返回一个响应给API的调用者并停止处理。3)循环访问数组并传递到第三个actor中,该actor接收JSON对象,将其转换为Avro消息并发送到Kafka主题。
post {
entity(as[JsObject]) { membersObj =>
requestContext =>
val membersJson = membersObj.fields("messages").convertTo[JsArray].elements
val messageService = actorRefFactory.actorOf(Props(new MessageProcessingServicev2()))
val avroService = actorRefFactory.actorOf(Props(new AvroSchemaService()))
val validationService = actorRefFactory.actorOf(Props(new JSONMessageValidationService()))
implicit val timeout = Timeout(5 seconds)
val future = avroService ? AvroSchema.MemberSchema(requestContext)
val memberSchema:Schema = Await.result(future, timeout.duration).asInstanceOf[Schema]
for (member <- membersJson) validationService ! ValidationService.MemberValidation(member.asJsObject, memberSchema, requestContext)
for (member <- membersJson) (messageService ! MessageProcessingv2.ProcessMember(member.asJsObject, topicName, memberSchema, requestContext))
我浏览了很多关于这个主题的博客/书籍/幻灯片,但不确定最好的方法是什么。我已经使用Scala/Akka大约两个月了,基本上自学了我需要的部分。因此,经验丰富的Scala/Akka/Spray开发人员在这方面的任何见解都是非常值得赞赏的。我的一个想法是把3个演员包装成一个“主”演员,让每个演员都成为那个演员的孩子,并试图像那样接近它。
由于您正在使用异步处理(!
),您无法控制邮件发送后的处理。您需要使用ask(?
),它将返回您可以使用的将来值。
但我有个更好的主意。您可以将消息从第一个演员发送到第二个演员。您可以将消息发送给第三个参与者,而不是将结果返回给第一个参与者,以便继续计算。
示范喷洒服务(路线): Redis执行元(Mock还没有实际的Redis客户端)
我很难理解Akka中的演员,以及一个线索如何与一个演员相关联。 让我们以Fridge Actor和Person Actor向Fridge Actor引用发送GetFoodMessage为例。假设不变性受到尊重。 这些消息是在不同的线程中“同时”处理,还是在队列中一个接一个地处理? 线程产卵是否完全由库管理并从actor的概念中抽象出来? 参与者引用是参与者的实例吗? 当我阻止一个演员(和他的孩子)
我很想知道调整大小,或者在本例中增加单个节点系统上的actor池中actor的数量是否真的会影响性能。 我有一个带超线程的四核系统。在任何给定的点上,系统可以运行8个线程。假设执行元执行的大多数操作都是CPU绑定的,那么将池中的执行元数量从20个增加到40个会有什么收获呢?
我经常发现自己使用一个“主”角色,为子任务创建许多子角色。当子任务完成时,主角也应该停止自己。所以当时,我观察子角色并停止主角色context.children.is。 我经常使用这种模式,但因为我从未读过这方面的文章。我不确定,这是一个好主意还是失败的演员有问题。。。? 我已经读过Akka 2中的关机模式,但是这种方法在Java中似乎比我的解决方案更复杂? 以下是我针对具有两个子任务的主要参与者
让我们假设一个使用Akka Typed实现的应用程序有一个持久执行元。这个持久执行元作为其操作的一部分创建了瞬态(或非持久)子执行元,每个子执行元都有一个唯一的ID,这些ID是持久状态的一部分。持久执行元还需要一些与其子级通信的方式,但我们不希望持久化子级的,因为它们实际上不是状态的一部分。在恢复时,持久参与者应该基于恢复的状态重新创建它的子级。这听起来并不像是一个很不寻常的用例,我正在试图弄清楚
我正在使用Scala/Akka编写一个TCP客户端服务器程序。服务器端的一些参与者需要处理来自客户端的TCP消息。我使用了(复制)代码,基本上解析接收到的TCP消息,在接收到分隔符时,消息被发送给其他人。 由于不止一个actor使用此逻辑,所以我在baseTCP actor中对其进行了抽象,并从该actor继承了其他actor。我想在这个基本actor中添加一些常见的代码,比如处理bound/co