介绍完SocketServer的核心组件,下面介绍SocketServer是怎么对请求进行区分处理的。先看看SocketServer的定义。
class SocketServer(val config: KafkaConfig,
val metrics: Metrics,
val time: Time,
val credentialProvider: CredentialProvider)
extends Logging with KafkaMetricsGroup with BrokerReconfigurable {
// SocketServer实现BrokerReconfigurable trait表明SocketServer的一些参数配置是允许动态修改的
// 即在Broker不停机的情况下修改它们
// SocketServer的请求队列长度,由Broker端参数queued.max.requests值而定,默认值是500
private val maxQueuedRequests = config.queuedMaxRequests
private val logContext = new LogContext(s"[SocketServer brokerId=${config.brokerId}] ")
this.logIdent = logContext.logPrefix
private val memoryPoolSensor = metrics.sensor("MemoryPoolUtilization")
private val memoryPoolDepletedPercentMetricName = metrics.metricName("MemoryPoolAvgDepletedPercent", MetricsGroup)
private val memoryPoolDepletedTimeMetricName = metrics.metricName("MemoryPoolDepletedTimeTotal", MetricsGroup)
memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE
// data-plane
private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
// 处理数据类请求的Acceptor线程池,每套监听器对应一个Acceptor线程
private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time)
// control-plane
// 用于处理控制类请求的Processor线程
// 注意:目前定义了专属的Processor线程而非线程池处理控制类请求
// Control plane 的配套资源只有 1 个 Acceptor 线程 + 1 个 Processor 线程 + 1 个深度是 20 的请求队列而已。
// 一旦你开启了 Control plane 设置,其 Processor 线程就只有 1 个,Acceptor 线程也是 1 个。另外,你要注意,它对应的 RequestChannel 里面的请求队列长度被硬编码成了 20,而不是一个可配置的值。
// 这揭示了社区在这里所做的一个假设:即控制类请求的数量应该远远小于数据类请求,因而不需要为它创建线程池和较深的请求队列。
private var controlPlaneProcessorOpt : Option[Processor] = None
private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
// 处理控制类请求专属的RequestChannel对象
val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ =>
new RequestChannel(20, ControlPlaneMetricPrefix, time))
// Data plane 和 Control plane 注释下面分别定义了一组变量,即 Processor 线程池、Acceptor 线程池和 RequestChannel 实例。
// Processor 线程池:即网络线程池,负责将请求高速地放入到请求队列中。
// Acceptor 线程池:保存了 SocketServer 为每个监听器定义的 Acceptor 线程,此线程负责分发该监听器上的入站连接建立请求。
// RequestChannel:承载请求队列的请求处理通道。
}
Data plane 和 Control plane
社区将 Kafka 请求类型划分为两大类:数据类请求和控制类请求。Data plane 和 Control plane 的字面意思是数据面和控制面,各自对应数据类请求和控制类请求,也就是说 Data plane 负责处理数据类请求,Control plane 负责处理控制类请求。
目前,Controller 与 Broker 交互的请求类型有 3 种:LeaderAndIsrRequest、StopReplicaRequest 和 UpdateMetadataRequest。这 3 类请求属于控制类请求,通常应该被赋予高优先级。像我们熟知的 PRODUCE 和 FETCH 请求,就是典型的数据类请求。
介绍SocketServer 类的定义之后,现在看SocketServer 是如何为 Data plane 和 Control plane 创建所需资源的操作了。
创建 Control plane 所需资源
private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int,
endpoints: Seq[EndPoint]): Unit = {
// 遍历监听器集合
endpoints.foreach { endpoint =>
// 将监听器纳入到连接配额管理之下
connectionQuotas.addListener(config, endpoint.listenerName)
// 为监听器创建对应的Acceptor线程
val dataPlaneAcceptor = createAcceptor(endpoint, DataPlaneMetricPrefix)
// 为监听器创建多个Processor线程。具体数目由num.network.threads决定
addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener)
// 将<监听器,Acceptor线程>对保存起来统一管理
dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
info(s"Created data-plane acceptor and processors for endpoint : ${endpoint.listenerName}")
}
}
创建 Control plane 所需资源
前面说过了,基于控制类请求的负载远远小于数据类请求负载的假设,Control plane 的配套资源只有 1 个 Acceptor 线程 + 1 个 Processor 线程 + 1 个深度是 20 的请求队列而已。
SocketServer 提供了 createControlPlaneAcceptorAndProcessor 方法,用于为 Control plane 创建所需资源。
private def createControlPlaneAcceptorAndProcessor(endpointOpt: Option[EndPoint]): Unit = {
// 如果为Control plane配置了监听器
endpointOpt.foreach { endpoint =>
// 将监听器纳入到连接配额管理之下
connectionQuotas.addListener(config, endpoint.listenerName)
// 为监听器创建对应的Acceptor线程
val controlPlaneAcceptor = createAcceptor(endpoint, ControlPlaneMetricPrefix)
// 为监听器创建对应的Processor线程
val controlPlaneProcessor = newProcessor(nextProcessorId, controlPlaneRequestChannelOpt.get, connectionQuotas, endpoint.listenerName, endpoint.securityProtocol, memoryPool)
controlPlaneAcceptorOpt = Some(controlPlaneAcceptor)
controlPlaneProcessorOpt = Some(controlPlaneProcessor)
val listenerProcessors = new ArrayBuffer[Processor]()
listenerProcessors += controlPlaneProcessor
// 将Processor线程添加到控制类请求专属RequestChannel中
// 即添加到RequestChannel实例保存的Processor线程池中
controlPlaneRequestChannelOpt.foreach(_.addProcessor(controlPlaneProcessor))
nextProcessorId += 1
// 把Processor对象也添加到Acceptor线程管理的Processor线程池中
controlPlaneAcceptor.addProcessors(listenerProcessors, ControlPlaneThreadPrefix)
info(s"Created control-plane acceptor and processor for endpoint : ${endpoint.listenerName}")
}
}
Processor 和 Acceptor 线程是在启动 SocketServer 组件之后启动的,具体代码在 KafkaServer.scala 文件的 startup 方法中
// KafkaServer.scala
def startup(): Unit = {
try {
info("starting")
......
// 创建SocketServer组件
socketServer = new SocketServer(config, metrics, time, credentialProvider)
// 启动SocketServer,但不启动Processor线程
socketServer.startup(startProcessingRequests = false)
......
// 启动Data plane和Control plane的所有线程
socketServer.startProcessingRequests(authorizerFutures)
......
} catch {
......
}
}
SocketServer 的 startProcessingRequests 方法就是启动这些线程的方法
def startProcessingRequests(authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = Map.empty): Unit = {
info("Starting socket server acceptors and processors")
this.synchronized {
if (!startedProcessingRequests) {
// 启动处理控制类请求的Processor和Acceptor线程
startControlPlaneProcessorAndAcceptor(authorizerFutures)
// 启动处理数据类请求的Processor和Acceptor线程
startDataPlaneProcessorsAndAcceptors(authorizerFutures)
startedProcessingRequests = true
} else {
info("Socket server acceptors and processors already started")
}
}
info("Started socket server acceptors and processors")
}
这个方法又进一步调用了 startDataPlaneProcessorsAndAcceptors 和 startControlPlaneProcessorAndAcceptor 方法分别启动 Data plane 的 Control plane 的线程。
private def startDataPlaneProcessorsAndAcceptors(authorizerFutures: Map[Endpoint, CompletableFuture[Void]]): Unit = {
// 获取Broker间通讯所用的监听器,默认是PLAINTEXT
val interBrokerListener = dataPlaneAcceptors.asScala.keySet
.find(_.listenerName == config.interBrokerListenerName)
.getOrElse(throw new IllegalStateException(s"Inter-broker listener ${config.interBrokerListenerName} not found, endpoints=${dataPlaneAcceptors.keySet}"))
val orderedAcceptors = List(dataPlaneAcceptors.get(interBrokerListener)) ++
dataPlaneAcceptors.asScala.filter { case (k, _) => k != interBrokerListener }.values
orderedAcceptors.foreach { acceptor =>
val endpoint = acceptor.endPoint
// 启动Processor和Acceptor线程
startAcceptorAndProcessors(DataPlaneThreadPrefix, endpoint, acceptor, authorizerFutures)
}
}
private def startControlPlaneProcessorAndAcceptor(authorizerFutures: Map[Endpoint, CompletableFuture[Void]]): Unit = {
controlPlaneAcceptorOpt.foreach { controlPlaneAcceptor =>
val endpoint = config.controlPlaneListener.get
// 启动Processor和Acceptor线程
startAcceptorAndProcessors(ControlPlaneThreadPrefix, endpoint, controlPlaneAcceptor, authorizerFutures)
}
}