在使用ODL的过程中,突然出现如题所述的故障,字面上看在actor的池中存在了两个同名的actor。在功能开发设计时,当然是不允许actor名称相同,基本可以排除开发人员识定义了重名的actor。下面编写可复现的测试说明该问题。
测试代码,父Actor
package com.zte.sunquan.deom.ofo;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import java.util.Optional;
/**
* Created by sunquan on 2018/3/29.
*/
public class ParentActor extends AbstractActor {
private String name;
private ActorRef c1;
public ParentActor(String name) {
this.name = name;
}
public ParentActor() {
c1 = getContext().actorOf(ChildActor.props("c1"), "c1");
ActorRef c2 = getContext().actorOf(ChildActor.props("c2"), "c2");
}
public static Props props() {
return Props.create(ParentActor.class);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("killc1",p->{
getContext().actorOf(ChildActor.props("c1"),"c1");
getContext().stop(c1);
})
.matchAny(p -> {
System.out.println("parent:" + p);
}).build();
}
@Override
public void preStart() throws Exception {
super.preStart();
System.out.println("parent pre start");
}
@Override
public void postStop() throws Exception {
super.postStop();
System.out.println("parent pre stop");
}
@Override
public void preRestart(Throwable reason, Optional<Object> message) throws Exception {
super.preRestart(reason, message);
System.out.println("parent pre reStart2");
}
@Override
public void postRestart(Throwable reason) throws Exception {
super.postRestart(reason);
System.out.println("parent pre reStart1");
}
}
子Actor
package com.zte.sunquan.deom.ofo;
import akka.actor.AbstractActor;
import akka.actor.Props;
import java.util.Optional;
/**
* Created by sunquan on 2018/3/29.
*/
public class ChildActor extends AbstractActor {
private String name;
public ChildActor() {
}
public ChildActor(String name) {
this.name = name;
}
public static Props props(String name) {
return Props.create(ChildActor.class,name);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.matchAny(p -> {
System.out.println("child:" + p);
})
.build();
}
@Override
public void preStart() throws Exception {
super.preStart();
System.out.println("child " + name + " pre start");
}
@Override
public void postStop() throws Exception {
super.postStop();
System.out.println("child " + name + " pre stop");
}
@Override
public void preRestart(Throwable reason, Optional<Object> message) throws Exception {
super.preRestart(reason, message);
System.out.println("child " + name + " pre reStart1");
}
@Override
public void postRestart(Throwable reason) throws Exception {
super.postRestart(reason);
System.out.println("child " + name + " pre reStart2");
}
}
测试用例:
package com.zte.sunquan.deom.ofo;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import org.junit.Test;
/**
* Created by sunquan on 2018/3/29.
*/
public class PCTest {
static ActorSystem _system = ActorSystem.create("mysystem");
@Test
public void test() throws InterruptedException {
_system.actorOf(ParentActor.props(),"parent");
ActorRef parent = _system.actorFor("akka://mysystem/user/parent");
Thread.sleep(3000);
ActorRef c1 = _system.actorFor("akka://mysystem/user/parent/c1");
ActorRef c2 = _system.actorFor("akka://mysystem/user/parent/c2");
parent.tell("p1",ActorRef.noSender());
c1.tell("c1",ActorRef.noSender());
c2.tell("c2",ActorRef.noSender());
parent.tell("killc1",ActorRef.noSender());
_system.actorOf(ChildActor.props("c1"),"c1");
Thread.sleep(5000);
_system.terminate();
}
}
打印结果:
parent pre start
child c2 pre start
child c1 pre start
child:c2
child:c1
parent:p1
child c1 pre start ===========ParentActor接收killc1,导致创建c1
parent pre stop ===========此时由于c1存在同名actor,未关闭,所以导致父Actor,关闭重启
parent pre reStart2
child c2 pre stop
child c1 pre stop
child c1 pre start
child c2 pre start
parent pre start
parent pre reStart1 ============父Actor重启以及其中的子Actor重启
[ERROR] [03/29/2018 14:08:05.983] [mysystem-akka.actor.default-dispatcher-6] [akka://mysystem/user/parent] actor name [c1] is not unique!
akka.actor.InvalidActorNameException: actor name [c1] is not unique!
at akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:129)
at akka.actor.dungeon.Children$class.reserveChild(Children.scala:134)
at akka.actor.ActorCell.reserveChild(ActorCell.scala:370)
at akka.actor.dungeon.Children$class.makeChild(Children.scala:272)
at akka.actor.dungeon.Children$class.actorOf(Children.scala:44)
at akka.actor.ActorCell.actorOf(ActorCell.scala:370)
at com.zte.sunquan.deom.ofo.ParentActor.lambda$createReceive$0(ParentActor.java:31)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at akka.actor.Actor$class.aroundReceive(Actor.scala:514)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:132)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527)
at akka.actor.ActorCell.invoke(ActorCell.scala:496)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[INFO] [03/29/2018 14:08:10.968] [Thread-0] [CoordinatedShutdown(akka://mysystem)] Starting coordinated shutdown from JVM shutdown hook
child c1 pre stop
child c1 pre stop
child c2 pre stop
parent pre stop
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8 -Duser.language=en -Duser.country=US
Process finished with exit code 0
在向ParentActor发送killC后的处理逻辑中有再次创建c1这个Actor,与池中已存在的c1这个Actor的创建重名异常,被父actor中的监督策略捕获,触发重启(包括了子actor)
父 Actor 首先调用 preRestart ,然后 被实例化,在实例化过程中,再次创建c1这个actor,再调用 postRestart,最后再重启它的子Actor,子Actor c1重启时,又重名。所以打印异常,终止了所有actor。
如何避免上述异常出现:
1、在创建Actor时,要确保这个Actor被正常完全关闭了。为些专门写了个工具类,用于彻底关闭Actor
package com.zte.sunquan.deom.ofo;
import java.util.concurrent.TimeUnit;
import akka.actor.ActorRef;
import akka.actor.TypedActorExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @Author 10184538
* @Date 2019/9/19 19:41
**/
public class TypedActorUtils {
private static final Logger LOG = LoggerFactory.getLogger(TypedActorUtils.class);
private static final int RETRY_TIMES = 25;
public static void stop(TypedActorExtension typedActorExtension, Object resolver) {
//Bug Fix:stop resolve and avoid (akka.actor.InvalidActorNameException)
int times = RETRY_TIMES;
boolean isStop = false;
if (resolver == null || typedActorExtension == null) {
LOG.info("resolver or typeActorExtension cant not be null.");
return;
}
typedActorExtension.stop(resolver);
while (times-- > 0) {
try {
TimeUnit.SECONDS.sleep(1);
ActorRef actorRefFor = typedActorExtension.getActorRefFor(resolver);
if (actorRefFor == null) {
isStop = true;
break;
}
if (actorRefFor.isTerminated()) {
isStop = true;
break;
}
} catch (InterruptedException e) {
LOG.error("Stop actor {} exception.", resolver.toString(), e);
Thread.currentThread().interrupt();
} catch (NullPointerException e) {
isStop = true;
break;
}
}
if (isStop) {
LOG.trace("resolver:{} stop success", resolver);
} else {
LOG.debug("resolver:{} stop failed", resolver);
}
}
}
TypedActorUtils.stop(TypedActor.get(actorSystem), deviceDataBroker);
2、参考如下:
官方文档和所配图片含义不一致。官方文档中说明当监督者收到Terminate消息后,则可以重新使用actor路径。
而生命周期示意图中将actor关闭后的生命周期分为三步
1:stop actor
2:notify watchers
3:allow to reuse path
具体介绍:
分析akka 源码:当子actor收到Terminate(akka.actor.actorCell)消息后,触发terminate(akka.actor.dungeon.FaultHandling)流程,其会关闭子actor中的所有childActor,然后再关闭自己,此时通过actorRef.isTerminate返回即为true,后在finishTerminate()流程中,通过parent.sendSystemMessage(DeathWatchNotificationparent)通知父actor删除子actor(异步),此时path路径可重复使用,同时通知所有的监督者,子actor已经被关闭
故障分析:我们在父actor中关闭子actor并重建,关闭子actor时需要通知父actor删除子actor,而父actor正在处理关闭子actor并重建,此时相当于死锁。
如上原因,则将所有的子actor关闭创建采用异步的模式,让出父actor。参考代码:
Executors.newSingleThreadExecutor().submit(() -> {
+ LOG.info("Create master source provider for node {}", nodeId);
+ //when became master, we should stop ClusteredDeviceSourcesResolver
+ //We will stop masterSourceProvider before create it in TypedActorUtils.createActor
+ TypedActorUtils.stop(TypedActor.get(actorSystem), resolver);
+
+ synchronized (object) {
+ Callable<MasterSourceProvider> callable = new Callable<MasterSourceProvider>() {
+ @Override
+ public MasterSourceProvider call() throws Exception {
+ return TypedActor.get(cachedContext).typedActorOf(
+ new TypedProps<>(MasterSourceProvider.class,
+ new Creator<MasterSourceProviderImpl>() {
+ @Override
+ public MasterSourceProviderImpl create() throws Exception {
+ return new MasterSourceProviderImpl(schemaRepo, sourceIds, actorSystem, topologyId, nodeId);
+ }
+ }), "masterSourceProvider");
+ }
+ };
+ masterSourceProvider = TypedActorUtils.createActor(masterSourceProvider, callable, TypedActor.get(actorSystem));
+ }