我最近在学习Akka演员。我在Actor里读了调度员的文件。我很好奇一个演员身上的阻隔操作。文档中的最后一个主题描述了如何解决问题。并且我正在尝试重现文档中的示例实验。
下面是我的代码:
package dispatcher
import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory
object Main extends App{
var config = ConfigFactory.parseString(
"""
|my-dispatcher{
|type = Dispatcher
|
|executor = "fork-join-executor"
|
|fork-join-executor{
|fixed-pool-size = 32
|}
|throughput = 1
|}
""".stripMargin)
// val system = ActorSystem("block", ConfigFactory.load("/Users/jiexray/IdeaProjects/ActorDemo/application.conf"))
val system = ActorSystem("block")
val actor1 = system.actorOf(Props(new BlockingFutureActor()))
val actor2 = system.actorOf(Props(new PrintActor()))
for(i <- 1 to 1000){
actor1 ! i
actor2 ! i
}
}
package dispatcher
import akka.actor.Actor
import scala.concurrent.{ExecutionContext, Future}
class BlockingFutureActor extends Actor{
override def receive: Receive = {
case i: Int =>
Thread.sleep(5000)
implicit val excutionContext: ExecutionContext = context.dispatcher
Future {
Thread.sleep(5000)
println(s"Blocking future finished ${i}")
}
}
}
package dispatcher
import akka.actor.Actor
class PrintActor extends Actor{
override def receive: Receive = {
case i: Int =>
println(s"PrintActor: ${i}")
}
}
我只需创建一个actorSystem
,其中包含默认的调度器,所有的参与者都依赖于这些调度器。BlockingFutuReactor
具有封装在Future
中的阻塞操作。printactor
只是立即打印一个数字。
> PrintActor: 44
> PrintActor: 45
默认的调度程序是否会为阻塞操作预留一些调度程序?这样即使有这么多请求调度器的阻塞操作,系统也能处理消息。
Akka文件中的实验是否可以复制?我的配置有问题吗。
谢谢你的建议。最好的祝愿。
您之所以在BlockingFutuReactor
中看到所有1000条打印语句之前看到PrintActor
中的任何打印语句,是因为BlockingFutuReactor
的Receive
块中的第一个Thread.Sleep
调用。这个thread.sleep
是您的代码与正式文档中的示例之间的关键区别:
override def receive: Receive = {
case i: Int =>
Thread.sleep(5000) // <----- this call is not in the example in the official docs
implicit val excutionContext: ExecutionContext = context.dispatcher
Future {
...
}
}
记住,参与者一次处理一个消息。thread.sleep(5000)
基本上模拟了一条至少需要五秒钟处理的消息。blockingfuturetor
在处理完当前消息之前不会处理另一个消息,即使它的邮箱中有数百条消息。当BlockingFutuReactor
正在处理值1
的第一个Int
消息时,PrintActor
已经完成了对发送给它的所有1000条消息的处理。为了更清楚地说明这一点,让我们添加一个println
语句:
override def receive: Receive = {
case i: Int =>
println(s"Entering BlockingFutureActor's receive: $i") // <-----
Thread.sleep(5000)
implicit val excutionContext: ExecutionContext = context.dispatcher
Future {
...
}
}
运行程序时的输出示例:
Entering BlockingFutureActor's receive: 1
PrintActor: 1
PrintActor: 2
PrintActor: 3
...
PrintActor: 1000
Entering BlockingFutureActor's receive: 2
Entering BlockingFutureActor's receive: 3
Blocking future finished 1
...
正如您所看到的,当blockingfuturetor
实际开始处理消息2
时,printactor
已经翻转了所有1000条消息。
如果删除第一个线程.sleep
,那么您将更快地看到消息从BlockingFutuReactor
的邮箱出列,因为工作被“委托”给了Future
。创建future
后,执行元将从其邮箱中获取下一条消息,而无需等待future
完成。下面是没有第一个线程的示例输出。sleep
(每次运行时不会完全相同):
Entering BlockingFutureActor's receive: 1
PrintActor: 1
PrintActor: 2
...
PrintActor: 84
PrintActor: 85
Entering BlockingFutureActor's receive: 2
Entering BlockingFutureActor's receive: 3
Entering BlockingFutureActor's receive: 4
Entering BlockingFutureActor's receive: 5
PrintActor: 86
PrintActor: 87
...
我们有一个使用Spring Framework在Tomcat中运行的Web应用程序。我们需要为循环操作添加一些计划作业。为此,我们遇到了Quartz Scheduler,并遵循了使用Quartz with Spring配置作业的教程,并按预期计划并运行了作业。 所以我们有一些任务是在应用程序启动时安排的。现在我们希望用户手动运行作业并更改作业的触发器,但是我们需要将这些更改持久化到数据库中。因此,
问题内容: 我正在使用Retrofit为异步网络调用返回rxjava Observable。 我发现自己在重复以下调用: 似乎我一直在订阅IO线程,并在Android主线程上进行观察。这似乎是我发现的所有资源都倡导的最佳实践。也许除了长时间运行的计算外,我不太了解何时要偏离此模式。 有没有一种方法可以通过默认subscriptionOn和observeOn线程来删除此样板? 这是rxjava插件的
我正在使用改型为我的异步网络调用返回rxjava Observable。 我发现自己重复以下调用: 似乎我总是在IO线程上订阅,在Android主线程上观察。这似乎是我找到的所有资源都提倡的最佳实践。也许除了长时间运行的计算之外,我不太明白我们什么时候会想要偏离这种模式。 有没有办法通过默认subscribeOn和observeOn线程来删除这个样板文件? 这是rxjava插件的用例吗?(我找不到
问题内容: 阻塞VM的整体性能更好,因为同步,线程生成和恢复等待值的阻塞客户端都不会浪费时间。因此,如果您愿意不时接受更高的延迟,则阻塞VM是一个不错的选择。尤其是如果交换很少发生并且大多数经常访问的数据恰好适合您的内存。 这是Redis的默认模式(这是唯一的前进模式,我相信现在2.6中已弃用VM),让OS处理分页(如果需要)。我的理解是正确的,启动/启动将需要一些时间才能变得“热”。当在具有16
本文向大家介绍在React中怎么阻止事件的默认行为?相关面试题,主要包含被问及在React中怎么阻止事件的默认行为?时的应答技巧和注意事项,需要的朋友参考一下 event.preventDefault();阻止浏览器默认行为, 例如标签不跳转 event.stopPropagation();阻止冒泡; 例如上级点击事件不生效
下面的代码片段执行两个线程,一个是每秒记录一次的简单计时器,第二个是执行remainder操作的无限循环: 这给出了以下结果: