当前位置: 首页 > 工具软件 > dispatcher > 使用案例 >

线程调度(Dispatcher)

丘飞
2023-12-01

       在前面的章节中,我们已经了解到Actor的基本知识点,例如创建方式、生命周期、消息通信、监控、容错。关于actor通信,大家都很熟悉,通过tell和ask就可以实现,但是,现在我想跟大家谈论一下actor的消息是如何进行调度的。

     Dispatcher

       在Akka中,为了保证消息处理的及时性和线程的使用效率,Dispatcher对线程池做了一些协调工作。简单来说,dispatcher就相当于一个消息控制中心,负责对消息进行分发和派送。由此可见,合理运用Dispatcher可以提升系统的吞吐量。

       在实际项目中,我们可能会遇到许多不同情况的任务,但大致可以分为两种:1.耗时长或长时间阻塞。2.耗时短,不会造成阻塞。针对不同的情况,在Actor系统中,我们可以选择不同的dispatcher进行处理,从而做到线程资源隔离。

     Executor

       在具体讲解Dispatcher使用方式之前,我们先了解一下它的核心内容-线程池(Executor),它提供了执行异步任务的策略,分为两种:

线程池

描述

thread-pool-executor

基于普通的线程池,它有一个工作队列(存储任务),当线程空闲时会从队列中获取任务并执行。

fork-join-executor

基于工作窃取的线程池(采用分而治之原理),它把大的任务拆分成小的任务然后并行执行,最后合并并结果,当某线程的任务队列没有任务时,会主动从其它线程的队列中获取任务。fork-join性能更佳,Akka默认选项。

     Akka文件配置

       这里先插入一个小知识点,我们先简单学习一下Akka的文件配置和加载。在默认情况下,程序会加载classpath下application.conf、application.json、application.properties文件。这里使用application.properties示例:

akka.log-config-on-start=on

       这里log-config-on-start配置成on,表示启动时(即调用ActorSystem.create()方法)会打印配置信息。

       这是单文件的配置方式,还可以进行多文件配置。在大型项目中,配置文件可能需要拆分成多个文件,在Akka中,我们可以include的方式引入其它配置文件。现在我们有配置文件application.properties,system.properties文件,例如:

       system.properties    

include "application"

akka.loglevel=debug

       这里system.properties引入了application配置文件,并增加了loglevel配置。在创建ActorSystem时,我们可以显示加载配置文件,例如:

 ActorSystem system = ActorSystem.create("system", ConfigFactory.load("system"));

     Dispatcher配置

       在Actor系统中,ActorSystem会采用一个使用fork-join-executor的dispatcher默认配置。当然,如果我们想要自定义配置,也不是很复杂。下面,定义一个config-fork-join-dispatcher,如下:

config-fork-join-dispatcher{
    type=Dispatcher
    executor="fork-join-executor"
    fork-join-executor{
        parallelism-min=3
        parallelism-factor=3.0
        parallelism-max=15
    }
    throughput=1
}

     关于上述所使用到的参数,详细信息如下:

配置项

描述

config-fork-join-dispatcher

自定义dispatcher名称,Actor通过该名称配置dispatcher。

type

类型,除了dispatcher,还有PinnedDispatcher,CallingThreadDispatcher。

executor

选择异步执行任务策略,也就是使用何种类型的线程池。

parallelism-min

最小并发线程数。

parallelism-factor

并发因子,用于计算最大线程数,最大线程数=处理器个数*并发因子。在这里并发因子为3,假如处理器个数为4,最大线程数则是12。

parallelism-max

最大线程数,但是在实际运行过程中,最大线程数=min(parallelism-max,处理器个数*并发因子),也就是最小值的那个。

throughput

表明某个线程在放回线程池之前所能处理的消息数,这里配置为1,表明尽可能公平的分配消息

     thread-pool-executor的配置与fork-join-executor极其相似:

config-thread-pool-dispatcher{
    type=Dispatcher
    executor="thread-pool-executor"
    thread-pool-executor{
        #最小线程并发数
        core-pool-size-min=3
        #并发因子
        core-pool-size-factor=3.0
        #最大线程并发数
        core-pool-size-max=15
    }
    throughput=1
}

       关于thread-pool和fork-join的配置,到此为止。上面关于dispatcher的配置,我们放到system.conf文件里,现在我们创建一个Actor,使用一下它们:

public class ConfigActor extends AbstractActor {
    public static void main(String[] args) {
        //加载system配置文件
        ActorSystem system = ActorSystem.create("system", ConfigFactory.load("system"));
        ActorRef configActor = system.actorOf(Props.create(ConfigActor.class).withDispatcher("config-fork-join-dispatcher"), "configActor");
        configActor.tell("hello", ActorRef.noSender());

    }

    @Override
    public Receive createReceive() {
        return receiveBuilder().matchAny(other -> {
            System.out.println( other + " " + Thread.currentThread().getName());
        }).build();
    }
}

      运行结果:

hello system-config-fork-join-dispatcher-5

      从结果中,我们可以看出配置的Dispatcher已经生效,这里采用withDispatcher("config-fork-join-dispatcher")。使用dispatcher还有一种简单的方式,那就是基于配置文件,例如:

akka.actor.deployment{
   /configActor{
        dispatcher=config-fork-join-dispatcher
   }
}

     PinnedDispatcher

        PinnedDispatcher是另外一种Dispatcher类型,它会为每个Actor提供只有一个线程的线程池,该线程池Actor独有。使用方式如下:

        PinnedDispatcher的配置:

config-pinned-dispatcher{

     executor="thread-pool-executor"

     type=PinnedDispatcher

}

       在这里,我们不需要再添加诸如上述的并发因子、最小线程数等参数,因为它会为每个Actor创建独有的线程池。

       我们修改上面的示例,来模拟一下耗时的操作:

       修改createReceive()方法:

 public Receive createReceive() {
        return receiveBuilder().matchAny(other -> {
            System.out.println( other + " " + Thread.currentThread().getName());
            //沉睡4s,模拟耗时的操作
            Thread.sleep(4000);
        }).build();
    }

       然后创建大量的Actor,并给这些Actor发送消息:

for(int i=0;i<50;i++){
            ActorRef configActor = system.actorOf(Props.create(ConfigActor.class).withDispatcher("config-pinned-dispatcher"), "configActor"+i);
            configActor.tell("hello", ActorRef.noSender());
        }

       运行示例,大家应该可以发现,即使我们这里模拟了耗时的操作,但是消息发送并没有延迟,证明了PinnedDispatcher会给每个Actor创建一个独立的线程池,Actor之间并不会相互影响。虽然PinnedDispatcher可以提升系统的吞吐量,但是大量的创建独有线程池可能会耗尽服务器的资源,所以大家在使用PinnedDispatcher之前,先对该耗时操作的并发量做好评估。

     总结      

       上述,我们讲解了Actor消息之间是如何进行分发的,在系统中,当我们没有对Dispatcher进行配置时,系统也会采用默认的Dispatcher和性能最佳的fork-join-executor进行线程调度。当我们想提高Actor系统的吞吐量时,不妨根据系统情况对线程调度做最好的配置。

 类似资料: