当前位置: 首页 > 知识库问答 >
问题:

非阻塞定时器任务和Akka演员

鄢飞鸾
2023-03-14

请注意:虽然我更喜欢使用Akka的JavaAPI的解决方案(这就是我正在使用的),但我对任何工作解决方案都很满意,并且可能会想出如何将基于Scala的答案翻译成Java。

我有一个Akka应用程序,有很多演员,其中两个是FizzBuzzFizzactor可以接受两种类型的消息:

  • StartNewTimerWG;和
  • ResetAllTimer任务

Buzz执行器简单地接受一条DoItNow消息。这些参与者和系统其他部分之间的消息流如下:

  1. 任何东西(其他参与者,甚至参与者系统之外的事件驱动组件)都可以随时向Fizz参与者发送StartNewTimerWG消息
  2. 每次Fizz参与者收到一条StartNewTimerWG消息,它都会创建并启动一个新的异步/非阻塞计时器,尝试运行8秒。如果计时器到达终点(8秒),则向Buzz演员发送一条DoItNow消息
  3. Fizz参与者可以接受任意数量的并发StartNewTimerWG消息,因此,可以同时“管理”潜在的多个计时器,每个计时器都指向8秒的神奇数字。因此,如果其他20个参与者在几秒钟内向Fizz参与者发送StartNewTimerWG消息,那么Fizz参与者将同时“管理”20个非阻塞、独立的计时器。当这20个定时器中的每一个达到各自的8秒持续时间时,它们会向Buzz演员发送20条独立的DoItNow消息
  4. Fizz执行器收到一条ResetAllTimer任务消息时,任何当前正在进行中的计时器都将被中断/取消(以便它们停止倒计时到8秒的持续时间,从而阻止它们发送一个DoItNow消息到Buzz)。因此,借用我们上面的例子,如果在时间t=1t=3之间,Fizz参与者收到20条StartNewTimerWG消息,那么在t=10可能是14个他们各自的定时器已经过去并触发了DoItNow消息,也许6条消息仍在进行中。如果在那个确切的时刻Fizz收到了一条ResetAllTimer任务消息,它将阻止这6个定时器运行和触发消息,因此在这个例子中Buzz只会收到14条DoItNow消息

我知道Java8API(sans-Akka)主张扩展TimerTask并将这些任务提交给Timer\scheduleAtFixedRate方法,但我不确定这是否与Akka冲突,或者是否有更好的方法用Akka API实现此功能。我迄今为止最好的尝试是:

// Groovy pseudo-code
class MyTimerTask extends TimerTask {
  @Inject
  ActorRef buzz

  @Override
  void run() {
    // No op!
  }

  void completeTask() {
    buzz.tell(new DoItNow(), null)
  }
}

class Fizz extends UntypedAbstractActor {
  @Inject
  Timer timer

  @Override
  void onReceive(Object message) {
    if(message in StartNewTimerTask) {
      timer.scheduleAtFixedRate(new MyTimerTask(), 0, 8 * 1000)
    } else if(message in ResetAllTimerTasks) {
      timer.cancel()
    }
  }
}

class Buzz extends UntypedAbstractActor {
  @Override
  void onReceive(Object message) {
    if(message in DoItNow) {
      // Do something super cool now...
    }
  }
}

但是,我认为我没有正确地管理计时器,也没有充分利用Akka调度器/计时器API的潜力。有什么想法吗?

共有1个答案

司空宗清
2023-03-14

考虑避开Java的TimerAPI,支持刚刚随Akka 2.5.4发布的新演员计时器功能。参与者定时器允许参与者使用一个或多个与其生命周期相关联的内部定时器来调度周期性消息。要在Java中访问此功能,只需将Fizz演员更改为扩展AbstractActorBackTimers

下面的示例在Scala中(在Scala中,混合在计时器特征中):

object Fizz {
  private case object SendToBuzz
}

class Fizz(buzz: ActorRef) extends Actor with Timers {
  import Fizz._

  def receive = {
    case StartNewTimerTask =>
      val uuid = java.util.UUID.randomUUID
      timers.startPeriodicTimer(uuid, SendToBuzz, 8.seconds)
    case ResetAllTimerTasks =>
      timers.cancelAll()
    case SendToBuzz =>
      buzz ! DoItNow
  }
}
  • Fizzactor处理StartNewTimerTask消息时,它启动一个新的计时器,该计时器将每隔八秒向self发送SendToBuzz消息(即Fizzactor)
  • Fizz处理SendToBuzz消息时,它向Buzz参与者发送DoItNow消息
  • Fizz处理ResetAllTimerTasks消息时,它会取消所有计时器
  • 如果Fizz重新启动或停止,则其所有计时器都将自动取消
 类似资料:
  • 假设我们有: 线程1,包含actor A、B和C。 包含执行元y的线程2。 包含演员Z的线程3。 演员A和B正在监听演员Y的消息。 然后,参与者C向参与者Z发出阻塞请求。 我包含了Actor Y,以允许它在Z处理来自C的请求时发送消息。 所有线程都在不同的物理核心上--它们并行运行。

  • 现在我们已经知道了Java NIO里面那些非阻塞特性是怎么工作的,但是要设计一个非阻塞的服务仍旧比较困难。非阻塞IO相对传统的阻塞IO给开发者带来了更多的挑战。在本节非阻塞服务的讲解中,我们一起来讨论这些会面临的主要挑战,同时也会给出一些潜在的解决方案。 查找关于设计非阻塞服务的相关资料是比较难的,本文提出的解决方案也只能是基于笔者个人的工作经验,构思。如果你有其他的解决方案或者是更好的点子,那么

  • 主要内容:1 非阻塞服务器-GitHub仓库,2 无阻塞IO管道,3 非阻塞与阻塞IO管道,4 基本的无阻塞IO管道设计,5 读取部分消息,6 存储部分消息,7 编写部分消息,8 总结,9 服务器线程模型即使你了解了Java NIO非阻塞功能如何工作(Selector,Channel, Buffer等),设计一个无阻塞服务器仍然很难。与阻塞IO相比,非阻塞IO包含多个挑战。这份非阻塞服务器教程将讨论非阻塞服务器的主要挑战,并为它们描述一些潜在的解决方案。 本教程中描述的思想是围绕Java NIO

  • 我完全混淆了,,。 哪个是阻塞,哪个不是? 我的意思是如果我使用父进程是否等待子进程返回/才继续执行。 如何影响这些调用?

  • java.util.concurrent.CompletionException:Akka.Pattern.AskTimeoutException:收件人[Actor[akka:/web_server/user/MyActor#-769383443]]已终止。发送者[null]发送了类型为“com.data.model.request”的消息。 所以我重写了方法,在那里添加了一个log语句。 现在

  • 非阻塞 IO 仅对在 Servlet 和 Filter(2.3.3.3节定义的,“异步处理”)中的异步请求处理和升级处理(2.3.3.5节定义的,“升级处理”)有效。否则,当调用 ServletInputStream.setReadListener 或ServletOutputStream.setWriteListener 方法时将抛出IllegalStateException。为了支持在 Ser