我有一个Play framework 2应用程序,可以接收数据并通过WebSockets将其发送到多个客户机。我使用Akka actors来处理WebSockets,就像本文档中一样。我还有一个WebSocketRouter
类,它扩展了UntypedActor
并包含路由逻辑(决定将系统接收的数据传递给哪些客户端)。我知道我可以使用Akka的路由器
功能,但这不是我目前的问题。问题是我必须存储所有活动客户端的列表。现在,我将它存储在WebSocketRouter
类的静态列表中。这是编写概念验证原型的最快方法,但它不是线程安全的,似乎也不是“Akka方法”。下面是一个简化的代码示例:
WebSocketController:
//This controller handles the creation of WebSockets.
public class WebSocketController extends Controller {
public static WebSocket<String> index() {
return WebSocket.withActor(new F.Function<ActorRef, Props>() {
public Props apply(ActorRef out) throws Throwable {
return MessageSender.props(out);
}
});
}
}
//Hold a reference to the auto-created Actor that handles WebSockets
//and also registers and unregisters itself in the router.
public class MessageSender extends UntypedActor {
public static Props props(ActorRef out) {
return Props.create(MessageSender.class, out);
}
private final ActorRef out;
public MessageSender(ActorRef out) {
this.out = out;
}
@Override
public void preStart() {
WebSocketRouter.addSender(getSelf());
}
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof String) {
out.tell(message, getSelf());
}
else {
unhandled(message);
}
}
public void postStop() {
WebSocketRouter.removeSender(getSelf());
}
}
public class WebSocketRouter extends UntypedActor {
private static ArrayList<ActorRef> senders;
static {
senders = new ArrayList<>();
}
public static void addSender(ActorRef actorRef){
senders.add(actorRef);
}
public static void removeSender(ActorRef actorRef){
senders.remove(actorRef);
}
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof String) {
for (ActorRef sender : senders) {
sender.tell(message, getSelf());
}
}
}
}
什么是最好的方法来解决我的问题,最适合Akka的意识形态?
与其拥有对参与者的静态引用(WebSocketRouter
),为什么不想出一些消息来发送它呢?这样,行动者就可以以一致的方式保持自己的内部状态。通过消息进行状态更改是参与者模型的主要好处之一。
在我进入代码之前,我很抱歉,如果这不是100%准确的,我只使用了Scala版本的Akka,并基于对Akka文档的快速扫描。
所以在你的例子中,我会定义几个对象来表示join/leave······
public class JoinMessage { }
public class ExitMessage { }
public class MessageSender extends UntypedActor {
public static Props props(ActorRef out) {
return Props.create(MessageSender.class, out);
}
private final ActorRef out;
private final ActorRef router;
public MessageSender(ActorRef out) {
this.out = out;
this.router= getContext().actorSelection("/Path/To/WebSocketRouter");
}
@Override
public void preStart() {
router.tell(new JoinMessage(), getSelf());
}
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof String) {
out.tell(message, getSelf());
} else {
unhandled(message);
}
}
}
public class WebSocketRouter extends UntypedActor {
private final Set<ActorRef> senders = new HashSet<>();
private void addSender(ActorRef actorRef){
senders.add(actorRef);
}
private void removeSender(ActorRef actorRef){
senders.remove(actorRef);
}
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof JoinMessage) {
addSender(sender);
getContext().watch(sender); // Watch sender so we can detect when they die.
} else if (message instanceof Terminated) {
// One of our watched senders has died.
removeSender(sender);
} else if (message instanceof String) {
for (ActorRef sender : senders) {
sender.tell(message, getSelf());
}
}
}
}
同样,这段代码是为了让您了解如何通过利用参与者模型来完成这项任务。对不起,如果Java不是100%准确,但希望您能遵循我的意图。
问题内容: 我有一个不是actor的java对象,它使用actorSelection(Path)从一个actor系统中选择actor。系统中可能不存在所选参与者。 在Java Api中,ActorSelection不存在ask(),因此我无法向actor选择发送和标识消息并使用响应的发送者。 我试图通过演员选择将消息发送给演员,然后对死信做出反应来解决该问题。但是我没有任何死信。 如何通过Acto
我正在使用Scala/Akka编写一个TCP客户端服务器程序。服务器端的一些参与者需要处理来自客户端的TCP消息。我使用了(复制)代码,基本上解析接收到的TCP消息,在接收到分隔符时,消息被发送给其他人。 由于不止一个actor使用此逻辑,所以我在baseTCP actor中对其进行了抽象,并从该actor继承了其他actor。我想在这个基本actor中添加一些常见的代码,比如处理bound/co
Akka的文件警告: 在actor内部使用未来的回调时,如onComplete、onSuccess和onFailure,您需要小心避免关闭包含actor的引用,即不要从回调中调用方法或访问封闭actor上的可变状态 在我看来,如果我能让想要访问可变状态的未来在同一调度程序上运行,该调度程序安排了处理actor消息的线程的相互排除,那么这个问题就可以避免。这可能吗?(为什么不呢? 提供的没有绑定到参
我很想知道调整大小,或者在本例中增加单个节点系统上的actor池中actor的数量是否真的会影响性能。 我有一个带超线程的四核系统。在任何给定的点上,系统可以运行8个线程。假设执行元执行的大多数操作都是CPU绑定的,那么将池中的执行元数量从20个增加到40个会有什么收获呢?
我正在运行Akka2.0.2微内核,希望为不受信任的远程参与者实现一个身份验证方案。 首先想到的是设置一个身份验证执行元,该执行元在身份验证成功时返回对工作执行元的引用。 也就是说,我要阻止远程参与者在没有身份验证的情况下访问我的微内核参与者系统中的参与者。 在actorOf()中不给工作执行元一个名字是不够的,因为它会得到一个容易猜测的自动生成的名字。有没有一种方法可以禁用Actor的远程查找,
我正在将现有应用程序从Akka Classic移植到Akka Typed。最初,您可以使用上下文获取对参与者的引用。actorSelection()。resolveOne() 我知道在Akka Type中不再支持这一点,我们应该使用来注册演员以供发现。 但是,我只想将消息发送到本地参与者,即存在于集群中每个节点上的本地单例。我有它的本地路径,但没有对它的直接引用。这是因为它是由Akka管理系统创建