在Apache Spark中,Dispatcher是Spark Master进程的一个组件,用于接收和处理来自客户端应用程序的请求,例如提交应用程序、杀死应用程序等。Dispatcher作为Spark Master进程的一部分,是处理客户端请求的主要入口点。当客户端应用程序需要与Spark Master交互时,它们会将请求发送到Dispatcher,Dispatcher负责将请求路由到相应的Spark Master组件进行处理。
Dispatcher的主要作用是:
1、接收和处理来自客户端应用程序的请求:客户端应用程序可以通过向Spark Master发送请求来与Spark集群交互,例如提交应用程序、查询集群状态、杀死应用程序等。Dispatcher负责接收这些请求,并将它们分配给相应的Spark Master组件进行处理。
2、维护请求队列:当Dispatcher接收到请求后,它会将请求放入队列中,并按照请求类型和优先级进行排序。这样做可以保证请求得到及时处理,并且避免了并发请求的竞争。
3、路由请求到相应的Spark Master组件:Dispatcher会根据请求类型和优先级将请求路由到相应的Spark Master组件进行处理。例如,如果请求是提交应用程序的请求,Dispatcher会将其发送到应用程序管理器进行处理;如果请求是杀死应用程序的请求,Dispatcher会将其发送到应用程序管理器或任务调度器进行处理。
总的来说,Dispatcher是Spark Master进程的一个关键组件,负责协调和处理来自客户端应用程序的请求,使得Spark集群可以高效地处理任务并提供服务。
Dispatcher是在NettyRpcEnv实例化的时候创建的。
private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
在Dispatcher类中,有以下几个参数需要关注。
private val endpoints: ConcurrentMap[String, EndpointData] = new ConcurrentHashMap[String, EndpointData]
private val endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef] = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]
// Track the receivers whose inboxes may contain messages.
private val receivers = new LinkedBlockingQueue[EndpointData]
/** Message loop used for dispatching messages. */
private class MessageLoop extends Runnable {
override def run(): Unit = {
try {
while (true) {
try {
val data = receivers.take()
if (data == PoisonPill) {
// Put PoisonPill back so that other MessageLoops can see it.
receivers.offer(PoisonPill)
return
}
data.inbox.process(Dispatcher.this)
} catch {
case NonFatal(e) => logError(e.getMessage, e)
}
}
} catch {
case ie: InterruptedException => // exit
}
}
}
private class EndpointData(
val name: String,
val endpoint: RpcEndpoint,
val ref: NettyRpcEndpointRef) {
val inbox = new Inbox(ref, endpoint)
}
从代码中,可以看到 receivers 是一个Queue,用来传递消息,传递的是具有inbox(inbox中存放的是message队列,以及一条OnStart()的消息)的EndpointData。而MessageLoop是一个线程,一直不断的 receivers 队列中取消息并处理。
那消息又是如何处理的呢, 我们来翻下简化后的代码,这里就是调用inbox中的endpoint的onStart()方法。
/**
* Process stored messages.
*/
def process(dispatcher: Dispatcher): Unit = {
var message: InboxMessage = null
inbox.synchronized {
message = messages.poll()
}
while (true) {
safelyCall(endpoint) {
message match {
case OnStart =>
endpoint.onStart()
}
}
}
}
至此,我们知道,Dispatcher就是源源不断的从Queue中取消息,并调用其他endpoint的不同方法(这里只介绍了Onstart()方法,有兴趣的同学可以翻下源码。)