当前位置: 首页 > 工具软件 > Actor IM > 使用案例 >

Akka之actor name [c1] is not unique!

苏畅
2023-12-01

在使用ODL的过程中,突然出现如题所述的故障,字面上看在actor的池中存在了两个同名的actor。在功能开发设计时,当然是不允许actor名称相同,基本可以排除开发人员识定义了重名的actor。下面编写可复现的测试说明该问题。

测试代码,父Actor

package com.zte.sunquan.deom.ofo;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import java.util.Optional;

/**
 * Created by sunquan on 2018/3/29.
 */
public class ParentActor extends AbstractActor {
    private String name;
    private ActorRef c1;
    public ParentActor(String name) {
        this.name = name;
    }

    public ParentActor() {
         c1 = getContext().actorOf(ChildActor.props("c1"), "c1");
        ActorRef c2 = getContext().actorOf(ChildActor.props("c2"), "c2");
    }

    public static Props props() {
        return Props.create(ParentActor.class);
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .matchEquals("killc1",p->{
                    getContext().actorOf(ChildActor.props("c1"),"c1");
                    getContext().stop(c1);

                })
                .matchAny(p -> {
            System.out.println("parent:" + p);
        }).build();
    }

    @Override
    public void preStart() throws Exception {
        super.preStart();
        System.out.println("parent pre start");
    }

    @Override
    public void postStop() throws Exception {
        super.postStop();
        System.out.println("parent pre stop");
    }

    @Override
    public void preRestart(Throwable reason, Optional<Object> message) throws Exception {
        super.preRestart(reason, message);
        System.out.println("parent pre reStart2");
    }

    @Override
    public void postRestart(Throwable reason) throws Exception {
        super.postRestart(reason);
        System.out.println("parent pre reStart1");
    }
}

子Actor

package com.zte.sunquan.deom.ofo;

import akka.actor.AbstractActor;
import akka.actor.Props;
import java.util.Optional;

/**
 * Created by sunquan on 2018/3/29.
 */
public class ChildActor extends AbstractActor {
    private String name;

    public ChildActor() {

    }

    public ChildActor(String name) {
        this.name = name;
    }

    public static Props props(String name) {
        return Props.create(ChildActor.class,name);
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .matchAny(p -> {
            System.out.println("child:" + p);
        })
                .build();
    }

    @Override
    public void preStart() throws Exception {
        super.preStart();
        System.out.println("child " + name + " pre start");
    }

    @Override
    public void postStop() throws Exception {
        super.postStop();
        System.out.println("child " + name + " pre stop");
    }

    @Override
    public void preRestart(Throwable reason, Optional<Object> message) throws Exception {
        super.preRestart(reason, message);
        System.out.println("child " + name + " pre reStart1");
    }

    @Override
    public void postRestart(Throwable reason) throws Exception {
        super.postRestart(reason);
        System.out.println("child " + name + " pre reStart2");
    }
}

测试用例:

package com.zte.sunquan.deom.ofo;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import org.junit.Test;

/**
 * Created by sunquan on 2018/3/29.
 */
public class PCTest {
    static ActorSystem _system = ActorSystem.create("mysystem");

    @Test
    public void test() throws InterruptedException {
        _system.actorOf(ParentActor.props(),"parent");
        ActorRef parent = _system.actorFor("akka://mysystem/user/parent");
        Thread.sleep(3000);
        ActorRef c1 = _system.actorFor("akka://mysystem/user/parent/c1");
        ActorRef c2 = _system.actorFor("akka://mysystem/user/parent/c2");
        parent.tell("p1",ActorRef.noSender());
        c1.tell("c1",ActorRef.noSender());
        c2.tell("c2",ActorRef.noSender());
        parent.tell("killc1",ActorRef.noSender());
        _system.actorOf(ChildActor.props("c1"),"c1");
        Thread.sleep(5000);
        _system.terminate();
    }

}

打印结果:


parent pre start
child c2 pre start
child c1 pre start
child:c2
child:c1
parent:p1
child c1 pre start        ===========ParentActor接收killc1,导致创建c1
parent pre stop           ===========此时由于c1存在同名actor,未关闭,所以导致父Actor,关闭重启
parent pre reStart2
child c2 pre stop        
child c1 pre stop
child c1 pre start
child c2 pre start         
parent pre start
parent pre reStart1       ============父Actor重启以及其中的子Actor重启
[ERROR] [03/29/2018 14:08:05.983] [mysystem-akka.actor.default-dispatcher-6] [akka://mysystem/user/parent] actor name [c1] is not unique!
akka.actor.InvalidActorNameException: actor name [c1] is not unique!
	at akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:129)
	at akka.actor.dungeon.Children$class.reserveChild(Children.scala:134)
	at akka.actor.ActorCell.reserveChild(ActorCell.scala:370)
	at akka.actor.dungeon.Children$class.makeChild(Children.scala:272)
	at akka.actor.dungeon.Children$class.actorOf(Children.scala:44)
	at akka.actor.ActorCell.actorOf(ActorCell.scala:370)
	at com.zte.sunquan.deom.ofo.ParentActor.lambda$createReceive$0(ParentActor.java:31)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:514)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:132)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527)
	at akka.actor.ActorCell.invoke(ActorCell.scala:496)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[INFO] [03/29/2018 14:08:10.968] [Thread-0] [CoordinatedShutdown(akka://mysystem)] Starting coordinated shutdown from JVM shutdown hook
child c1 pre stop
child c1 pre stop
child c2 pre stop
parent pre stop
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8 -Duser.language=en -Duser.country=US

Process finished with exit code 0

在向ParentActor发送killC后的处理逻辑中有再次创建c1这个Actor,与池中已存在的c1这个Actor的创建重名异常,被父actor中的监督策略捕获,触发重启(包括了子actor)

父 Actor 首先调用 preRestart ,然后 被实例化,在实例化过程中,再次创建c1这个actor,再调用 postRestart,最后再重启它的子Actor,子Actor c1重启时,又重名。所以打印异常,终止了所有actor。

 

如何避免上述异常出现:

1、在创建Actor时,要确保这个Actor被正常完全关闭了。为些专门写了个工具类,用于彻底关闭Actor

package com.zte.sunquan.deom.ofo;

import java.util.concurrent.TimeUnit;

import akka.actor.ActorRef;
import akka.actor.TypedActorExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @Author 10184538
 * @Date 2019/9/19 19:41
 **/
public class TypedActorUtils {
    private static final Logger LOG = LoggerFactory.getLogger(TypedActorUtils.class);
    private static final int RETRY_TIMES = 25;

    public static void stop(TypedActorExtension typedActorExtension, Object resolver) {
        //Bug Fix:stop resolve and avoid (akka.actor.InvalidActorNameException)
        int times = RETRY_TIMES;
        boolean isStop = false;
        if (resolver == null || typedActorExtension == null) {
            LOG.info("resolver or typeActorExtension cant not be null.");
            return;
        }
        typedActorExtension.stop(resolver);
        while (times-- > 0) {
            try {
                TimeUnit.SECONDS.sleep(1);
                ActorRef actorRefFor = typedActorExtension.getActorRefFor(resolver);
                if (actorRefFor == null) {
                    isStop = true;
                    break;
                }
                if (actorRefFor.isTerminated()) {
                    isStop = true;
                    break;
                }
            } catch (InterruptedException e) {
                LOG.error("Stop actor {} exception.", resolver.toString(), e);
                Thread.currentThread().interrupt();
            } catch (NullPointerException e) {
                isStop = true;
                break;
            }
        }
        if (isStop) {
            LOG.trace("resolver:{} stop success", resolver);
        } else {
            LOG.debug("resolver:{} stop failed", resolver);
        }
    }
}

TypedActorUtils.stop(TypedActor.get(actorSystem), deviceDataBroker);

2、参考如下:
官方文档和所配图片含义不一致。官方文档中说明当监督者收到Terminate消息后,则可以重新使用actor路径。
而生命周期示意图中将actor关闭后的生命周期分为三步

1:stop actor 

2:notify watchers

 3:allow to reuse path

具体介绍:
    分析akka 源码:当子actor收到Terminate(akka.actor.actorCell)消息后,触发terminate(akka.actor.dungeon.FaultHandling)流程,其会关闭子actor中的所有childActor,然后再关闭自己,此时通过actorRef.isTerminate返回即为true,后在finishTerminate()流程中,通过parent.sendSystemMessage(DeathWatchNotificationparent)通知父actor删除子actor(异步),此时path路径可重复使用,同时通知所有的监督者,子actor已经被关闭
 

故障分析:我们在父actor中关闭子actor并重建,关闭子actor时需要通知父actor删除子actor,而父actor正在处理关闭子actor并重建,此时相当于死锁。
如上原因,则将所有的子actor关闭创建采用异步的模式,让出父actor。参考代码:

Executors.newSingleThreadExecutor().submit(() -> {
+            LOG.info("Create master source provider for node {}", nodeId);
+            //when became master, we should stop ClusteredDeviceSourcesResolver
+            //We will stop masterSourceProvider before create it in TypedActorUtils.createActor
+            TypedActorUtils.stop(TypedActor.get(actorSystem), resolver);
+
+            synchronized (object) {
+                Callable<MasterSourceProvider> callable = new Callable<MasterSourceProvider>() {
+                    @Override
+                    public MasterSourceProvider call() throws Exception {
+                        return TypedActor.get(cachedContext).typedActorOf(
+                                new TypedProps<>(MasterSourceProvider.class,
+                                        new Creator<MasterSourceProviderImpl>() {
+                                            @Override
+                                            public MasterSourceProviderImpl create() throws Exception {
+                                                return new MasterSourceProviderImpl(schemaRepo, sourceIds, actorSystem, topologyId, nodeId);
+                                            }
+                                        }), "masterSourceProvider");
+                    }
+                };
+                masterSourceProvider = TypedActorUtils.createActor(masterSourceProvider, callable, TypedActor.get(actorSystem));
+            }


 

 

 

 类似资料: