通过bin下面的kafka-server-start.sh 可以启动一个broker,命令如下:nohup ./kafka-server-start.sh config/server.properties &
这个脚本主要是调用kafka.kafka 这个类
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
与之对应的是停止一个broker对应的脚本是kafka-server-stop.sh
SIGNAL=${SIGNAL:-TERM}
PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}')
if [ -z "$PIDS" ]; then
echo "No kafka server to stop"
exit 1
else
kill -s $SIGNAL $PIDS
fi
这个脚本就是通过ps ax 查找到kafka的进程后,然后通过kill 命令将这个进程杀死
我们首先看看kafka这个类
core/src/main/scala/kafka/Kafka.scala
def main(args: Array[String]): Unit = {
try {
val serverProps = getPropsFromArgs(args)
val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
try {
if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk)
new LoggingSignalHandler().register()
} catch {
case e: ReflectiveOperationException =>
warn("Failed to register optional signal handler that logs a message when the process is terminated " +
s"by a signal. Reason for registration failure is: $e", e)
}
// attach shutdown handler to catch terminating signals as well as normal termination
Runtime.getRuntime().addShutdownHook(new Thread("kafka-shutdown-hook") {
override def run(): Unit = kafkaServerStartable.shutdown()
})
#这里调用kafkaServerStartable 的startup函数来启动broker
kafkaServerStartable.startup()
kafkaServerStartable.awaitShutdown()
}
KafkaServerStartable 这个类的starup和shutdonw函数如下:
\core\src\main\scala\kafka\server\KafkaServerStartable.scala
class KafkaServerStartable(val staticServerConfig: KafkaConfig, reporters: Seq[KafkaMetricsReporter]) extends Logging {
private val server = new KafkaServer(staticServerConfig, kafkaMetricsReporters = reporters)
def this(serverConfig: KafkaConfig) = this(serverConfig, Seq.empty)
def startup() {
#这里有调用KafkaServer的startup函数来启动broker
try server.startup()
catch {
case _: Throwable =>
// KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code
fatal("Exiting Kafka.")
Exit.exit(1)
}
}
def shutdown() {
try server.shutdown()
catch {
case _: Throwable =>
fatal("Halting Kafka.")
// Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit.
Exit.halt(1)
}
}
KafkaServer.scala 的startup函数如下:
core\src\main\scala\kafka\server\KafkaServer.scala
def startup() {
try {
info("starting")
quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
notifyClusterListeners(kafkaMetricsReporters ++ metrics.reporters.asScala)
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
这个类会启动很多manager,下面这行就可以看到logManager 是从kafkaserver中启动的
/* start log manager */
logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
logManager.startup()
}