当前位置: 首页 > 知识库问答 >
问题:

非类型化的参与者(Java+Akka):重新处理未处理的消息

凌轶
2023-03-14

我正在创建一个使用Java+阿卡的演员系统。特别是,我通过提供onReceive()方法的实现来定义非类型化的参与者。

在该方法中,我通过定义接收消息时要执行的逻辑来实现参与者的行为。它可能是:

public void onReceive(Object msg) throws Exception {
  if(msg instanceof MsgType1){ ... }
  else if(msg instanceof MsgType2){ ... }
  else unhandled(msg);
}

但是,如果演员只对单一类型的味精感兴趣呢?有没有可能实现选择性接收,使参与者等待某个消息(系统自动重新排队所有其他类型的消息)???

共有1个答案

丘畅
2023-03-14

在Akka Afaik中无法使用这种“A la Erlang”消息处理模式。但是,你可以使用Stash来获得你想要的效果。

因此代码看起来如下所示

public void onReceive(Object msg) throws Exception {
    if(msg instanceof MsgType1){ ... }
    else if(msg instanceof MsgType2){ ... }
    else stash();
}

在消息处理的某个时刻,您将切换到另一个状态(可能通过调用getContext().benge)。您还可以执行unstashall()调用,以便将在此之前忽略的消息重新追加回邮箱。

 类似资料:
  • 假设我正在为Akka(类型化)执行元定义一个行为,该行为将执行并发计算,将其结果报告给生成它的执行元,然后停止。 如果我用计算的所有输入初始化这个actor,加上对它的“父级”的引用,那么它将永远不需要接收任何类型的传入消息。 我将使用创建此行为,向它传递一个执行计算的函数,然后返回。 Akka(Typed)要求我为将返回此行为的函数提供某种的结果类型。虽然我可以为分配行为将发送的结果消息的类型,

  • 我们正在应用程序中使用apache kafka streams 0.10.2.0。我们利用kafka streams拓扑将处理后的数据传递到下一个主题,直到处理结束。 此外,我们使用AWS ECS容器来部署消费者应用程序。我们观察到消费者正在拾取非常旧的消息进行处理,尽管它们已经在更早的时候处理过。这个问题在服务扩展/缩减或新部署时随机发生。我知道在消费者重新平衡时,有些消息可以重新处理。但在这种

  • 我正在构建一个颤振应用程序,我必须解析api中的一些数据,我设置了所有内容,但我收到了这个错误,我不知道为什么,我是颤振新手,任何帮助都将不胜感激。谢谢。 生成的错误 这是我的api响应示例 这就是我处理数据的方式 这是模型课

  • 我正在尝试使用注释处理器来验证注释,并且作为这项工作的一部分,我正在尝试弄清楚如何使用API来确定可执行文件的参数是否是参数化类型(例如List 除了解析ve.asType(). toString()给出的字符串之外,还有什么方法可以做到这一点吗?其中VariableElement ve是ExecutableElemente.getParameters()的一个元素?对这些类型有一个比简单的字符串

  • 问题内容: 我之前从未遇到过此错误,所以我不确定该怎么做或意味着什么 未处理的异常类型 它在以下代码中发生: 它给了我2个选项“添加抛出声明”和“使用try / catch进行环绕”。 我该怎么办,为什么? 问题答案: 这意味着您要调用的方法已使用指令声明了从类派生的异常。当以这种方式声明一个方法时,您将被迫使用一个块来处理该异常,或者将一个相同的(对于相同的异常或超类型)语句添加到您的方法声明中

  • 我对RabbitMQ很陌生,所以如果我的问题听起来很琐碎,请原谅。我想在RabbitMQ上发布消息,它将由RabbitMQ消费者处理。 我的消费者机器是一个多核机器(最好是azure上的工作者角色)。但QueueBasicConsumer一次推送一条消息。我如何编程来利用我可以同时处理多个消息的所有核心。 一种解决方案是在多个线程中打开多个通道,然后在那里处理消息。但在这种情况下,我将如何决定线程