当前位置: 首页 > 工具软件 > dispatcher > 使用案例 >

Spark3源码精读 之 Dispatcher

洪伟彦
2023-12-01

Dispatcher 介绍(chatgpt)

在Apache Spark中,Dispatcher是Spark Master进程的一个组件,用于接收和处理来自客户端应用程序的请求,例如提交应用程序、杀死应用程序等。Dispatcher作为Spark Master进程的一部分,是处理客户端请求的主要入口点。当客户端应用程序需要与Spark Master交互时,它们会将请求发送到Dispatcher,Dispatcher负责将请求路由到相应的Spark Master组件进行处理。

Dispatcher的主要作用是:
1、接收和处理来自客户端应用程序的请求:客户端应用程序可以通过向Spark Master发送请求来与Spark集群交互,例如提交应用程序、查询集群状态、杀死应用程序等。Dispatcher负责接收这些请求,并将它们分配给相应的Spark Master组件进行处理。
2、维护请求队列:当Dispatcher接收到请求后,它会将请求放入队列中,并按照请求类型和优先级进行排序。这样做可以保证请求得到及时处理,并且避免了并发请求的竞争。
3、路由请求到相应的Spark Master组件:Dispatcher会根据请求类型和优先级将请求路由到相应的Spark Master组件进行处理。例如,如果请求是提交应用程序的请求,Dispatcher会将其发送到应用程序管理器进行处理;如果请求是杀死应用程序的请求,Dispatcher会将其发送到应用程序管理器或任务调度器进行处理。

总的来说,Dispatcher是Spark Master进程的一个关键组件,负责协调和处理来自客户端应用程序的请求,使得Spark集群可以高效地处理任务并提供服务。

Dispatcher 初始化代码

Dispatcher是在NettyRpcEnv实例化的时候创建的。

private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)

在Dispatcher类中,有以下几个参数需要关注。

private val endpoints: ConcurrentMap[String, EndpointData] = new ConcurrentHashMap[String, EndpointData]

private val endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef] = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]

// Track the receivers whose inboxes may contain messages.
private val receivers = new LinkedBlockingQueue[EndpointData]

/** Message loop used for dispatching messages. */
private class MessageLoop extends Runnable {
  override def run(): Unit = {
    try {
      while (true) {
        try {
          val data = receivers.take()
          if (data == PoisonPill) {
            // Put PoisonPill back so that other MessageLoops can see it.
            receivers.offer(PoisonPill)
            return
          }
          data.inbox.process(Dispatcher.this)
        } catch {
          case NonFatal(e) => logError(e.getMessage, e)
        }
      }
    } catch {
      case ie: InterruptedException => // exit
    }
  }
}

private class EndpointData(
    val name: String,
    val endpoint: RpcEndpoint,
    val ref: NettyRpcEndpointRef) {
  val inbox = new Inbox(ref, endpoint)
}

从代码中,可以看到 receivers 是一个Queue,用来传递消息,传递的是具有inbox(inbox中存放的是message队列,以及一条OnStart()的消息)的EndpointData。而MessageLoop是一个线程,一直不断的 receivers 队列中取消息并处理。
那消息又是如何处理的呢, 我们来翻下简化后的代码,这里就是调用inbox中的endpoint的onStart()方法。

/**
 * Process stored messages.
 */
def process(dispatcher: Dispatcher): Unit = {
  var message: InboxMessage = null
  inbox.synchronized {
    message = messages.poll()
  }
  while (true) {
    safelyCall(endpoint) {
      message match {
        case OnStart =>
          endpoint.onStart()
      }
    }
  }
}

至此,我们知道,Dispatcher就是源源不断的从Queue中取消息,并调用其他endpoint的不同方法(这里只介绍了Onstart()方法,有兴趣的同学可以翻下源码。)

 类似资料: