当前位置: 首页 > 工具软件 > ControlPlane > 使用案例 >

Kafka请求处理模块(五):SocketServer的 Data plane 与 Control plane

蒋奇
2023-12-01

介绍完SocketServer的核心组件,下面介绍SocketServer是怎么对请求进行区分处理的。先看看SocketServer的定义。

class SocketServer(val config: KafkaConfig,
                   val metrics: Metrics,
                   val time: Time,
                   val credentialProvider: CredentialProvider)
  extends Logging with KafkaMetricsGroup with BrokerReconfigurable {
  // SocketServer实现BrokerReconfigurable trait表明SocketServer的一些参数配置是允许动态修改的
  // 即在Broker不停机的情况下修改它们
  // SocketServer的请求队列长度,由Broker端参数queued.max.requests值而定,默认值是500
  private val maxQueuedRequests = config.queuedMaxRequests

  private val logContext = new LogContext(s"[SocketServer brokerId=${config.brokerId}] ")
  this.logIdent = logContext.logPrefix

  private val memoryPoolSensor = metrics.sensor("MemoryPoolUtilization")
  private val memoryPoolDepletedPercentMetricName = metrics.metricName("MemoryPoolAvgDepletedPercent", MetricsGroup)
  private val memoryPoolDepletedTimeMetricName = metrics.metricName("MemoryPoolDepletedTimeTotal", MetricsGroup)
  memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
  private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE
  // data-plane
  private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
  // 处理数据类请求的Acceptor线程池,每套监听器对应一个Acceptor线程
  private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
  val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time)
  // control-plane
  // 用于处理控制类请求的Processor线程
  // 注意:目前定义了专属的Processor线程而非线程池处理控制类请求
  // Control plane 的配套资源只有 1 个 Acceptor 线程 + 1 个 Processor 线程 + 1 个深度是 20 的请求队列而已。
  // 一旦你开启了 Control plane 设置,其 Processor 线程就只有 1 个,Acceptor 线程也是 1 个。另外,你要注意,它对应的 RequestChannel 里面的请求队列长度被硬编码成了 20,而不是一个可配置的值。
  // 这揭示了社区在这里所做的一个假设:即控制类请求的数量应该远远小于数据类请求,因而不需要为它创建线程池和较深的请求队列。
  private var controlPlaneProcessorOpt : Option[Processor] = None
  private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
  // 处理控制类请求专属的RequestChannel对象
  val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ =>
    new RequestChannel(20, ControlPlaneMetricPrefix, time))
  // Data plane 和 Control plane 注释下面分别定义了一组变量,即 Processor 线程池、Acceptor 线程池和 RequestChannel 实例。
  // Processor 线程池:即网络线程池,负责将请求高速地放入到请求队列中。
  // Acceptor 线程池:保存了 SocketServer 为每个监听器定义的 Acceptor 线程,此线程负责分发该监听器上的入站连接建立请求。
  // RequestChannel:承载请求队列的请求处理通道。
}

Data plane 和 Control plane

社区将 Kafka 请求类型划分为两大类:数据类请求和控制类请求。Data plane 和 Control plane 的字面意思是数据面和控制面,各自对应数据类请求和控制类请求,也就是说 Data plane 负责处理数据类请求,Control plane 负责处理控制类请求。
       目前,Controller 与 Broker 交互的请求类型有 3 种:LeaderAndIsrRequest、StopReplicaRequest 和 UpdateMetadataRequest。这 3 类请求属于控制类请求,通常应该被赋予高优先级。像我们熟知的 PRODUCE 和 FETCH 请求,就是典型的数据类请求。
        介绍SocketServer 类的定义之后,现在看SocketServer 是如何为 Data plane 和 Control plane 创建所需资源的操作了。

创建 Control plane 所需资源

  private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int,
                                                    endpoints: Seq[EndPoint]): Unit = {
    // 遍历监听器集合
    endpoints.foreach { endpoint =>
      // 将监听器纳入到连接配额管理之下
      connectionQuotas.addListener(config, endpoint.listenerName)
      // 为监听器创建对应的Acceptor线程
      val dataPlaneAcceptor = createAcceptor(endpoint, DataPlaneMetricPrefix)
      // 为监听器创建多个Processor线程。具体数目由num.network.threads决定
      addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener)
      // 将<监听器,Acceptor线程>对保存起来统一管理
      dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
      info(s"Created data-plane acceptor and processors for endpoint : ${endpoint.listenerName}")
    }
  }

创建 Control plane 所需资源

        前面说过了,基于控制类请求的负载远远小于数据类请求负载的假设,Control plane 的配套资源只有 1 个 Acceptor 线程 + 1 个 Processor 线程 + 1 个深度是 20 的请求队列而已。
        SocketServer 提供了 createControlPlaneAcceptorAndProcessor 方法,用于为 Control plane 创建所需资源。

  private def createControlPlaneAcceptorAndProcessor(endpointOpt: Option[EndPoint]): Unit = {
    // 如果为Control plane配置了监听器
    endpointOpt.foreach { endpoint =>
      // 将监听器纳入到连接配额管理之下
      connectionQuotas.addListener(config, endpoint.listenerName)
      // 为监听器创建对应的Acceptor线程
      val controlPlaneAcceptor = createAcceptor(endpoint, ControlPlaneMetricPrefix)
      // 为监听器创建对应的Processor线程
      val controlPlaneProcessor = newProcessor(nextProcessorId, controlPlaneRequestChannelOpt.get, connectionQuotas, endpoint.listenerName, endpoint.securityProtocol, memoryPool)
      controlPlaneAcceptorOpt = Some(controlPlaneAcceptor)
      controlPlaneProcessorOpt = Some(controlPlaneProcessor)
      val listenerProcessors = new ArrayBuffer[Processor]()
      listenerProcessors += controlPlaneProcessor
      // 将Processor线程添加到控制类请求专属RequestChannel中
      // 即添加到RequestChannel实例保存的Processor线程池中
      controlPlaneRequestChannelOpt.foreach(_.addProcessor(controlPlaneProcessor))
      nextProcessorId += 1
      // 把Processor对象也添加到Acceptor线程管理的Processor线程池中
      controlPlaneAcceptor.addProcessors(listenerProcessors, ControlPlaneThreadPrefix)
      info(s"Created control-plane acceptor and processor for endpoint : ${endpoint.listenerName}")
    }
  }

Processor 和 Acceptor 线程是在启动 SocketServer 组件之后启动的,具体代码在 KafkaServer.scala 文件的 startup 方法中

// KafkaServer.scala
def startup(): Unit = {
    try {
      info("starting")
      ......
      // 创建SocketServer组件
      socketServer = new SocketServer(config, metrics, time, credentialProvider)
      // 启动SocketServer,但不启动Processor线程
      socketServer.startup(startProcessingRequests = false)
      ......
      // 启动Data plane和Control plane的所有线程
      socketServer.startProcessingRequests(authorizerFutures)
      ......
    } catch {
      ......
    }
}

SocketServer 的 startProcessingRequests 方法就是启动这些线程的方法

  def startProcessingRequests(authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = Map.empty): Unit = {
    info("Starting socket server acceptors and processors")
    this.synchronized {
      if (!startedProcessingRequests) {
        // 启动处理控制类请求的Processor和Acceptor线程
        startControlPlaneProcessorAndAcceptor(authorizerFutures)
        // 启动处理数据类请求的Processor和Acceptor线程
        startDataPlaneProcessorsAndAcceptors(authorizerFutures)
        startedProcessingRequests = true
      } else {
        info("Socket server acceptors and processors already started")
      }
    }
    info("Started socket server acceptors and processors")
  }

这个方法又进一步调用了 startDataPlaneProcessorsAndAcceptors 和 startControlPlaneProcessorAndAcceptor 方法分别启动 Data plane 的 Control plane 的线程。

  private def startDataPlaneProcessorsAndAcceptors(authorizerFutures: Map[Endpoint, CompletableFuture[Void]]): Unit = {
    // 获取Broker间通讯所用的监听器,默认是PLAINTEXT
    val interBrokerListener = dataPlaneAcceptors.asScala.keySet
      .find(_.listenerName == config.interBrokerListenerName)
      .getOrElse(throw new IllegalStateException(s"Inter-broker listener ${config.interBrokerListenerName} not found, endpoints=${dataPlaneAcceptors.keySet}"))
    val orderedAcceptors = List(dataPlaneAcceptors.get(interBrokerListener)) ++
      dataPlaneAcceptors.asScala.filter { case (k, _) => k != interBrokerListener }.values
    orderedAcceptors.foreach { acceptor =>
      val endpoint = acceptor.endPoint
      // 启动Processor和Acceptor线程
      startAcceptorAndProcessors(DataPlaneThreadPrefix, endpoint, acceptor, authorizerFutures)
    }
  }
  private def startControlPlaneProcessorAndAcceptor(authorizerFutures: Map[Endpoint, CompletableFuture[Void]]): Unit = {
    controlPlaneAcceptorOpt.foreach { controlPlaneAcceptor =>
      val endpoint = config.controlPlaneListener.get
      // 启动Processor和Acceptor线程
      startAcceptorAndProcessors(ControlPlaneThreadPrefix, endpoint, controlPlaneAcceptor, authorizerFutures)
    }
  }

 

 类似资料: