当前位置: 首页 > 知识库问答 >
问题:

Akka:等待多条消息

窦弘义
2023-03-14

Hi akka古鲁们:)你能在这一次指导我吗?

我要做的是-演员A向演员B要消息,然后等一个回来。但是,不知何故,演员B给A的不是一条信息,而是其中的4条信息。Afuture正确完成,但rest消息中有3条被算作死信。为什么?这样对吗?我是说,演员A有一个合适的处理人,那为什么信都死了?:-(

[INFO][11/22/2013 22:00:38.975][ForkJoinPool-2-worker-7][akka://actors/user/a]获得结果pong[INFO][11/22/2013 22:00:38.976][actors-akka.actor.default-dispatcher-4][akka://actors/deadletters]消息[java.lang.String]未从参与者[akka://actors/user/b#-759739990]传递到参与者[akka://actors/deadletters]。[1]遇到的死信。可以使用配置设置'akka.log-dead-letters'和'akka.log-dead-letters-dider-shutdown'关闭或调整此日志记录。...同一条信息再重复2次...

请看一下代码。

package head_thrash

import akka.actor._
import akka.util.Timeout
import scala.concurrent.duration._
import akka.pattern.ask
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

object Main extends App {
  val system = ActorSystem("actors")

  val a = system.actorOf(Props[A], "a")
  val b = system.actorOf(Props[B], "b")

  a ! "ping"
  system.awaitTermination()
}

class A extends Actor with ActorLogging {

  implicit val timeout = Timeout(5.seconds)

  def receive = {
    case "ping" => {
      val b = context.actorSelection("../b")
      val future: Future[String] = ask(b, "ping").mapTo[String]
      future.onSuccess {
        case result: String ⇒ {
          log.info("Got result " + result) // <-- got result pong here, that's okay
        }
      }
    }
    case "pong" => {
      log.info("hmmm...")
    }
  }
}

class B extends Actor with ActorLogging {
  def receive = {
    case "ping" => {
      sender ! "pong"
      sender ! "pong" // <-- dead letter!
      sender ! "pong" // <-- dead letter!
      sender ! "pong" // <-- dead letter!
    }
  }
}

那真让我迷惑。现在你可以问--嘿伙计,为什么你需要B发很多信息?好吧,这是更复杂的情况的部分-A向B要消息。B答案。然后A等待来自B的另一个消息,这里的棘手部分是在未来完成后等待--我只是无法下定决心使模型适合于Akka基础。

但是现在,我怎样才能正确处理所有4条消息,而不是死信呢?谢谢:-d

共有2个答案

谷善
2023-03-14

如果不等待消息,您会发现代码更容易编写。当你不按照时间或步骤顺序来考虑你的应用程序时,演员的工作效果最好。让它能够处理回调。但是使用!不是?。如果需要,可以使用benge或an FSM来显示这两种状态。

这不是你问的问题,但这是你想知道的。避免执行器的命令式可以防止大多数此类错误的发生。

汪栋
2023-03-14

您的问题是执行元B没有回答执行元a。如果我们阅读ask模式的文档,我们会发现ask创建一个临时的一次性执行元以接收对消息的答复,并用它完成一个Scala.concurrent.Future

这个临时actor根本不处理“pong”消息,他只是在等待任何答案,然后您将其转换为字符串的未来。

如果你想解决这个问题,你必须修改你的actor B,这样它首先回答临时的“Ask actor”,然后直接向actor A发送消息。

class B extends Actor with ActorLogging {
  def receive = {
    case "ping" => {
      sender ! "pong"  //the sender is the temp ask actor
      val a = context.actorSelection("../a") // get a ref on actor A
      a ! "pong"
      a ! "pong"
      a ! "pong"
    }
  }
}

这不是真的干净,但现在我希望你明白是怎么回事。

 类似资料:
  • 我如何获得第二条信息?

  • 我正在使用hiredis C库连接到redis服务器。我不知道在订阅新消息后如何等待新消息。 我的代码如下所示: 现在如何告诉雇佣者在频道上等待消息?

  • 我第一次做硒测试。在主页上,我调用了一些AJAX,我希望Selenium等待元素加载完成。我不确定它是否有效,但我只是键入selenium,waitForCondition可以选择。 无论我选择什么,它总是返回“false”。我现在连等待条件都不工作吗? 我如何测试它是否有效?在这些代码中我做错了什么? 如果由自己的类实现,则返回“true” isElementPresent(By.xpath(“

  • 问题内容: 我正在使用hiredis C库连接到redis服务器。我无法弄清楚订阅新消息后如何等待新消息。 我的代码如下所示: 现在如何告诉hiredis在频道上等待消息? 问题答案: 您无需告诉hiredis您需要在通道上等待:事件循环将仅在先前已注册的Redis连接上等待。 这是一个完整的示例: 您可以通过使用以下命令发布内容来对其进行测试: event_base_dispatch函数是实际启

  • 我的实验应用程序非常简单,尝试使用Actor和Akka可以做什么。 JVM启动后,它创建了一个带有几个普通角色的角色系统,即JMS消费者(akka.camel.Consumer)和JMS生产者(akka.camel.Producer)。它在演员和JMS制作人之间传递一些信息- 我时不时地遇到奇怪的行为:似乎时不时地,应该发送到JMS服务器的第一条消息不知何故丢失了。通过查看我的应用程序日志,我可以

  • 问题内容: 如何更改以下代码,以触发两个异步操作并有机会同时运行? 我需要做这样的事情吗? 问题答案: TL; DR 不要在获得承诺的问题中使用模式,而是分别等待它们;而是使用(至少现在): 虽然您的解决方案 确实 并行运行这两个操作,但如果两个诺言都被拒绝,它就无法正确处理拒绝。 细节: 您的解决方案并行运行它们,但始终等待第一个完成,然后再等待第二个。 如果您只想启动它们,并行运行它们,并获得