我正在用Akka打字,但我无法查看官方文件(https://doc.akka.io/docs/akka/current/typed/actors.html#actors),我发现它非常简短,即如何在Actor类型中配置调度程序。
下面是我的代码示例
private int actorTimeout = Integer.parseInt(getProperty("environment.actor.timeout", "10"));
@Autowired
private AkkaTypedDAO akkaTypedDAO;
private ActorSystem<AkkaTypedDTO> system;
@PostConstruct
private void initActor() {
system = ActorSystem.create(akkaTypedDAO.daoWithSupervisor, "AkkaTypedDAO");
}
private final Behavior<CommandAkkaTyped> service = Actor.immutable((ctx, msg) -> {
sendToDAO(msg.id).thenApply(either -> {
msg.replyTo.tell(either);
return either;
});
return Actor.same();
});
public final Behavior<CommandAkkaTyped> serviceWithSupervisor = Actor.supervise(service).onFailure(Exception.class, restart());
private CompletionStage<Either<ConnectorErrorVO, EntityDaoDTO>> sendToDAO(MUSIn<AkkaTypedPayLoad> id) {
return AskPattern.ask(system,
(ActorRef<Either<ConnectorErrorVO, EntityDaoDTO>> replyTo) -> new AkkaTypedDTO(new EntityDaoDTO(musIn), replyTo),
new Timeout(actorTimeout, TimeUnit.SECONDS), system.scheduler());
}
当我创建ActorSystem时,如何为我的A配置调度程序ctor.immutable?
您使用的是过时的Akka-Typed版本(有关Akka-Typed API历史的更多信息,请参阅此处)。根据Akka打字文档的当前版本(截至本文撰写之日,2.5.14):
要在生成参与者时指定调度程序,请使用DispatcherSelector。如果未指定,则参与者将使用默认调度程序,有关详细信息,请参阅默认调度程序。
public static final Behavior<Start> main =
Behaviors.setup(context -> {
final String dispatcherPath = "akka.actor.default-blocking-io-dispatcher";
Props props = DispatcherSelector.fromConfig(dispatcherPath);
final ActorRef<HelloWorld.Greet> greeter =
context.spawn(HelloWorld.greeter, "greeter", props);
return Behaviors.receiveMessage(msg -> {
ActorRef<HelloWorld.Greeted> replyTo =
context.spawn(HelloWorldBot.bot(0, 3), msg.name);
greeter.tell(new HelloWorld.Greet(msg.name, replyTo));
return Behaviors.same();
});
});
让我们假设一个使用Akka Typed实现的应用程序有一个持久执行元。这个持久执行元作为其操作的一部分创建了瞬态(或非持久)子执行元,每个子执行元都有一个唯一的ID,这些ID是持久状态的一部分。持久执行元还需要一些与其子级通信的方式,但我们不希望持久化子级的,因为它们实际上不是状态的一部分。在恢复时,持久参与者应该基于恢复的状态重新创建它的子级。这听起来并不像是一个很不寻常的用例,我正在试图弄清楚
我很想知道调整大小,或者在本例中增加单个节点系统上的actor池中actor的数量是否真的会影响性能。 我有一个带超线程的四核系统。在任何给定的点上,系统可以运行8个线程。假设执行元执行的大多数操作都是CPU绑定的,那么将池中的执行元数量从20个增加到40个会有什么收获呢?
我正在使用Scala/Akka编写一个TCP客户端服务器程序。服务器端的一些参与者需要处理来自客户端的TCP消息。我使用了(复制)代码,基本上解析接收到的TCP消息,在接收到分隔符时,消息被发送给其他人。 由于不止一个actor使用此逻辑,所以我在baseTCP actor中对其进行了抽象,并从该actor继承了其他actor。我想在这个基本actor中添加一些常见的代码,比如处理bound/co
我经常发现自己使用一个“主”角色,为子任务创建许多子角色。当子任务完成时,主角也应该停止自己。所以当时,我观察子角色并停止主角色context.children.is。 我经常使用这种模式,但因为我从未读过这方面的文章。我不确定,这是一个好主意还是失败的演员有问题。。。? 我已经读过Akka 2中的关机模式,但是这种方法在Java中似乎比我的解决方案更复杂? 以下是我针对具有两个子任务的主要参与者
[04/27/2014 18:09:05.518][ReadScheduler-Akka.actor.Default-Dispatcher-3][Akka://ReadScheduler/User/Collector]从参与者[Akka://ReadScheduler/User/Executor#2127791644]到参与者[Akka://ReadScheduler/User/Collector
java.util.concurrent.CompletionException:Akka.Pattern.AskTimeoutException:收件人[Actor[akka:/web_server/user/MyActor#-769383443]]已终止。发送者[null]发送了类型为“com.data.model.request”的消息。 所以我重写了方法,在那里添加了一个log语句。 现在