当前位置: 首页 > 知识库问答 >
问题:

在Akka修改的hello world--为什么有这么多丢失的消息和死信?

苗征
2023-03-14

完全初学者这里在Akka2.2,尝试一个修改版本的Hello世界。修改是我创建1000个演员而不是一个。运行它,我不会返回所有1000个hello worlds,并在几条消息中重复这样的错误跟踪:

[07/23/2013 22:22:45.924][Main-Akka.actor.default-dispatcher-2][Akka://main/user/app/$si]从参与者[Akka://main/user/app#1478796310]到参与者[Akka://main/user/app/$si#-1309206213]的消息[net.clementlevallois.akkatest.greeter$msg]未送达。遇到1封死信。可以通过配置设置'Akka.log-dead-letters'和'Akka.log-dead-letters-dhider-shutdown'关闭或调整此日志记录。

这两个类:

public class App extends UntypedActor {

    @Override
    public void preStart() {
        // create the greeter actors
        List<ActorRef> actorRefs = new ArrayList();
        for (int i = 0; i < 1000; i++) {
            final ActorRef greeter = getContext().actorOf(Props.create(Greeter.class, i));
            this.getContext().watch(greeter);
            actorRefs.add(greeter);
        }
        // tell it to perform the greeting
        for (ActorRef actorRef : actorRefs) {
            actorRef.tell(Greeter.Msg.GREET, getSelf());
        }
    }

    @Override
    public void onReceive(Object msg) {

        if (msg instanceof DeadLetter) {
            System.out.println(msg);
        } else if (msg == Greeter.Msg.DONE) {
            // when the greeter is done, stop this actor and with it the application
            getContext().stop(getSelf());
        } else {
            unhandled(msg);
        }
    }

}



public class Greeter extends UntypedActor {

    String input;

    public static enum Msg {
        GREET, DONE;
    }

    public Greeter(int i) {
        input = String.valueOf(i);
    }

    @Override
    public void onReceive(Object msg) {
        if (msg == Msg.GREET) {
            System.out.println("Hello World! " + input);
            getSender().tell(Msg.DONE, getSelf());
        } else {
            unhandled(msg);
        }
    }
}

我希望在调试方面得到任何帮助,以及在这里创建参与者列表是否合适的问题上得到建议。

共有1个答案

周朗
2023-03-14

正如@Noah敏锐地指出的,当应用程序参与者收到done响应时,它正在停止自己。现在,stop只是阻止参与者重新引用更多的消息,但它仍然会处理停止之前邮箱中的内容,因此可能会处理超过1条消息。一旦这个参与者停止,通过getself传递给另一个参与者的actorref现在指向deadletter,这就是为什么您开始看到消息被路由到deadletter。如果您真的想对此进行全面测试,那么每次在appactor中收到响应时,都要减量一个计数器(从1000开始)。当它达到0时,您可以安全地停止它:

public class App extends UntypedActor {
  private int counter = 1000;

  @Override
  public void preStart() {
    // create the greeter actors
    List<ActorRef> actorRefs = new ArrayList();
    for (int i = 0; i < counter; i++) {
        final ActorRef greeter = getContext().actorOf(Props.create(Greeter.class, i));
        this.getContext().watch(greeter);
        actorRefs.add(greeter);
    }
    // tell it to perform the greeting
    for (ActorRef actorRef : actorRefs) {
        actorRef.tell(Greeter.Msg.GREET, getSelf());
    }
  }

  @Override
  public void onReceive(Object msg) {

    if (msg instanceof DeadLetter) {
        System.out.println(msg);
    } else if (msg == Greeter.Msg.DONE) {
        if (--counter <= 0){
          getContext().stop(getSelf());
        }

    } else {
        unhandled(msg);
    }
  }

}
 类似资料:
  • 我编写了一个非常简单的Flink流媒体作业,它使用从Kafka获取数据。 这工作得很好,每当我在Kafka上将某些内容放入主题时,它都会被我的Flink作业接收并处理。现在我试图看看如果我的Flink作业由于某种原因不在线会发生什么。所以我关闭了flink作业并继续向Kafka发送消息。然后我再次开始我的Flink作业,并期望它会处理同时发送的消息。 然而,我得到了以下信息: 因此,它基本上忽略了

  • 我错过了什么? AMQ版本5.13.2 Java 1.8.0\u 74 Windows 10 给定一个简单的测试用例,传输两条Object消息,一条带有数据,另一条是数据结束标记。只有数据结束标记被接收。 队列在作业开始时创建,并在作业完成后销毁。 如果我运行更多的事务,我会看到大约50%的接收率。 日志清楚地显示接收器在第一条消息被放入队列之前就已启动,两条消息都被放入队列,但实际上只有第二条消

  • 本文向大家介绍RabbitMQ 怎么避免消息丢失?相关面试题,主要包含被问及RabbitMQ 怎么避免消息丢失?时的应答技巧和注意事项,需要的朋友参考一下 把消息持久化磁盘,保证服务器重启消息不丢失。 每个集群中至少有一个物理磁盘,保证消息落入磁盘。

  • 所以我和我的Kafka消费者之间有了一些恼人的矛盾。我使用“Kafka节点”为我的项目。我创造了一个话题。在一个使用者组中通过2台服务器创建了2个使用者。自动提交设置为false。对于我的消费者获得的每一个mesaage,他们会启动一个异步进程,该进程可能需要1~20秒,当进程完成时,消费者会提交偏移量。我的问题是:在一个senarios中,消费者1得到一个消息,需要20秒来处理。在过程中间,他得

  • 我试图实现一个A/B测试机制,使用Akka演员(,) 我把它设计成: 1 A/B测试根参与者 当A/B测试参与者收到消息时,它将选择一个变体并将消息转发给它。 然而,直到现在,我还没有设法让根演员产生它的孩子。这是我的密码。 运行此示例将打印: 但仅此而已,好像第31行()永远不会返回... 我是否错过了一些关于儿童演员繁殖的事情?

  • 我有一个视频文件,与此视频流: 流#0:0:视频:h264(主)(h264/0x34363248),yuv420p(电视,bt709,渐进式),1920x1080[SAR 1:1 DAR 16:9],4204kb/s,59.94fps,59.94tbr,59.94tbn,59.94tbc 我可以通过运行获得PTS信息: 我一行一行地得到每帧的PTS: 现在我需要将视频编码到H264,并且之后能够获