请注意:虽然我更喜欢使用Akka的JavaAPI的解决方案(这就是我正在使用的),但我对任何工作解决方案都很满意,并且可能会想出如何将基于Scala的答案翻译成Java。
我有一个Akka应用程序,有很多演员,其中两个是Fizz
和Buzz
。Fizz
actor可以接受两种类型的消息:
StartNewTimerWG
;和ResetAllTimer任务
Buzz
执行器简单地接受一条DoItNow
消息。这些参与者和系统其他部分之间的消息流如下:
Fizz
参与者发送StartNewTimerWG
消息Fizz
参与者收到一条StartNewTimerWG
消息,它都会创建并启动一个新的异步/非阻塞计时器,尝试运行8秒。如果计时器到达终点(8秒),则向Buzz演员发送一条DoItNow
消息 Fizz
参与者可以接受任意数量的并发StartNewTimerWG
消息,因此,可以同时“管理”潜在的多个计时器,每个计时器都指向8秒的神奇数字。因此,如果其他20个参与者在几秒钟内向Fizz
参与者发送StartNewTimerWG
消息,那么Fizz
参与者将同时“管理”20个非阻塞、独立的计时器。当这20个定时器中的每一个达到各自的8秒持续时间时,它们会向Buzz演员发送20条独立的DoItNow
消息 Fizz
执行器收到一条ResetAllTimer任务
消息时,任何当前正在进行中的计时器都将被中断/取消(以便它们停止倒计时到8秒的持续时间,从而阻止它们发送一个DoItNow
消息到Buzz
)。因此,借用我们上面的例子,如果在时间t=1
和t=3
之间,Fizz
参与者收到20条StartNewTimerWG
消息,那么在t=10
可能是14个他们各自的定时器已经过去并触发了DoItNow
消息,也许6条消息仍在进行中。如果在那个确切的时刻Fizz
收到了一条ResetAllTimer任务
消息,它将阻止这6个定时器运行和触发消息,因此在这个例子中Buzz
只会收到14条DoItNow
消息我知道Java8API(sans-Akka)主张扩展TimerTask
并将这些任务提交给Timer\scheduleAtFixedRate
方法,但我不确定这是否与Akka冲突,或者是否有更好的方法用Akka API实现此功能。我迄今为止最好的尝试是:
// Groovy pseudo-code
class MyTimerTask extends TimerTask {
@Inject
ActorRef buzz
@Override
void run() {
// No op!
}
void completeTask() {
buzz.tell(new DoItNow(), null)
}
}
class Fizz extends UntypedAbstractActor {
@Inject
Timer timer
@Override
void onReceive(Object message) {
if(message in StartNewTimerTask) {
timer.scheduleAtFixedRate(new MyTimerTask(), 0, 8 * 1000)
} else if(message in ResetAllTimerTasks) {
timer.cancel()
}
}
}
class Buzz extends UntypedAbstractActor {
@Override
void onReceive(Object message) {
if(message in DoItNow) {
// Do something super cool now...
}
}
}
但是,我认为我没有正确地管理计时器,也没有充分利用Akka调度器/计时器API的潜力。有什么想法吗?
考虑避开Java的Timer
API,支持刚刚随Akka 2.5.4发布的新演员计时器功能。参与者定时器允许参与者使用一个或多个与其生命周期相关联的内部定时器来调度周期性消息。要在Java中访问此功能,只需将Fizz
演员更改为扩展AbstractActorBackTimers
。
下面的示例在Scala中(在Scala中,混合在计时器
特征中):
object Fizz {
private case object SendToBuzz
}
class Fizz(buzz: ActorRef) extends Actor with Timers {
import Fizz._
def receive = {
case StartNewTimerTask =>
val uuid = java.util.UUID.randomUUID
timers.startPeriodicTimer(uuid, SendToBuzz, 8.seconds)
case ResetAllTimerTasks =>
timers.cancelAll()
case SendToBuzz =>
buzz ! DoItNow
}
}
Fizz
actor处理StartNewTimerTask
消息时,它启动一个新的计时器,该计时器将每隔八秒向self
发送SendToBuzz
消息(即Fizz
actor)
Fizz
处理SendToBuzz
消息时,它向Buzz
参与者发送DoItNow
消息
Fizz
处理ResetAllTimerTasks
消息时,它会取消所有计时器
Fizz
重新启动或停止,则其所有计时器都将自动取消
假设我们有: 线程1,包含actor A、B和C。 包含执行元y的线程2。 包含演员Z的线程3。 演员A和B正在监听演员Y的消息。 然后,参与者C向参与者Z发出阻塞请求。 我包含了Actor Y,以允许它在Z处理来自C的请求时发送消息。 所有线程都在不同的物理核心上--它们并行运行。
现在我们已经知道了Java NIO里面那些非阻塞特性是怎么工作的,但是要设计一个非阻塞的服务仍旧比较困难。非阻塞IO相对传统的阻塞IO给开发者带来了更多的挑战。在本节非阻塞服务的讲解中,我们一起来讨论这些会面临的主要挑战,同时也会给出一些潜在的解决方案。 查找关于设计非阻塞服务的相关资料是比较难的,本文提出的解决方案也只能是基于笔者个人的工作经验,构思。如果你有其他的解决方案或者是更好的点子,那么
主要内容:1 非阻塞服务器-GitHub仓库,2 无阻塞IO管道,3 非阻塞与阻塞IO管道,4 基本的无阻塞IO管道设计,5 读取部分消息,6 存储部分消息,7 编写部分消息,8 总结,9 服务器线程模型即使你了解了Java NIO非阻塞功能如何工作(Selector,Channel, Buffer等),设计一个无阻塞服务器仍然很难。与阻塞IO相比,非阻塞IO包含多个挑战。这份非阻塞服务器教程将讨论非阻塞服务器的主要挑战,并为它们描述一些潜在的解决方案。 本教程中描述的思想是围绕Java NIO
我完全混淆了,,。 哪个是阻塞,哪个不是? 我的意思是如果我使用父进程是否等待子进程返回/才继续执行。 如何影响这些调用?
java.util.concurrent.CompletionException:Akka.Pattern.AskTimeoutException:收件人[Actor[akka:/web_server/user/MyActor#-769383443]]已终止。发送者[null]发送了类型为“com.data.model.request”的消息。 所以我重写了方法,在那里添加了一个log语句。 现在
非阻塞 IO 仅对在 Servlet 和 Filter(2.3.3.3节定义的,“异步处理”)中的异步请求处理和升级处理(2.3.3.5节定义的,“升级处理”)有效。否则,当调用 ServletInputStream.setReadListener 或ServletOutputStream.setWriteListener 方法时将抛出IllegalStateException。为了支持在 Ser