对用户而言,broker的主要功能就是持久化消息以及将消息队列中的消息从发送端传输到消费端。Kafka的broker负责持久化producer端发送的消息,同时还为consumer端提供消息。
kafka 从 0.10.x
版本开始支持指定 broker 的机架信息(机架名称)。
Kafka 支持自动化的服务发现与成员管理。
Kafka 是依赖 zookeeper 实现的。每当一个 broker 启动时,它会将自己注册到 zookeeper 的临时节点:
(1)首先,每个 broker 在 zookeeper 下注册节点的路径是 chroot/brokers/ids/<broker.id>
。如果没有配置 chroot,则路径是 /brokers/ids/<broker.id>
。是否配置了 chroot 取决于 server.properties 中的 zookeeper.connect 参数是否设置了 chroot。
(2)其次,broker 向 zookeeper 中注册的信息以 json 格式保存。其中包括的信息如下:
{
"listener_security_protocol_map": {
"PLAINTEXT": "PLAINTEXT"
},
"endpoints": [
"PLAINTEXT://192.168.110.110:9092"
],
"jmx_port": 9999,
"host": "192.168.110.110",
"timestamp": "1575450860777",
"port": 9092,
"version": 4
}
listener_security_protocol_map
:此值指定了该 broker 与外界通信所用的安全协议类型。endpoints
:指定 broker 的服务 endpoint 列表,每个 endpoints 指明了传输安全协议类型、broker 主机名和端口信息。jmx_port
:broker 的 JMX 监控端口。需要在启动 broker 前设置 jmx_port 环境变量。host
:broker 主机名或 IP 地址。timestamp
:broker 启动时间。port
:broker 服务端口号。version
:broker 当前版本号。Kafka 利用 zookeeper 临时节点来管理 broker 生命周期的。broker 启动时在 zookeeper 中创建对应的临时节点,同时还会创建一个监听器,监听该临时节点的状态,一旦 broker 启动后,监听器会自动同步整个集群信息到 broker 上,而一旦 broker 崩溃,它与 zookeeper 的会话就会失效,导致临时节点被删除,监听器被触发,然后处理 broker 崩溃的后续事宜。
在一个 kafka 集群中,某个 broker 会被选举出来承担特殊的角色,即控制器(Controller)。顾明思议,引入 controller 就是用来管理和协调 kafka 集群的。具体来说,就是管理集群中所有分区和副本的状态并执行相应的管理操作。
Controller
负责为该分区选举新的 leader 副本;Controller
负责通知所有 broker 更新元数据信息;kafka-topic.sh
脚本为某个 topic 增加分区数量时,同样还是由Controller
负责分区的重新分配。kafka 中的 controller 选举工作依赖与 zookeeper,成功竞选为 controller 的 broker 会在 zookeeper 中创建 /controller
临时节点,此临时节点的内容参考如下:
{
"version": 1,
"brokerid": 2,
"timestamp": "1569267532839"
}
当 controller 和 zookeeper 失去连接时,临时节点会删除,而其他 broker 会监听该节点的变化,当节点删除时,其他broker会收到事件通知,重新发起 controller 选举。
每个 kafka 集群任意时刻都只能有一个 controller。每个broker启动的时候会去尝试读取/controller节点的brokerid的值,如果读取到brokerid的值不为-1,则表示已经有其他broker节点成功竞选为controller,所以当前broker就会放弃竞选;如果zookeeper中不存在临时节点,或者这个节点中的数据异常,那么就会尝试去创建/controller节点。当前broker去创建节点的时候,其他broker也有可能在创建这个节点,只有创建成功的那个broker才会成为controller。
每个broker都会在内存中保存当前控制器的brokerid值,这个值可以标识为activeControllerId
。当/controller
节点的数据发生变化时,每个broker都会更新内存中保存的activeControllerId,如果broker在数据变更前是控制器,在数据变更后自身的brokerid值与新的activeControllerId值不一致,那么在选举前需要“退位”,关闭相应的资源,比如关闭状态机、注销相应的监听器等。
zookeeper中还有一个与控制器有关的/controller_epoch
持久节点,节点中存放的是一个整型的controller_epoch值(初始值为1)。controller_epoch用于记录控制器发生变更的次数,即记录当前的控制器是第几代控制器,也可以称为“控制器的纪元”。
每个和控制器交互的请求都会携带controller_epoch这个字段,如果请求的controller_epoch值小于内存中的controller_epoch值,则认为这个请求是向已经过期的控制器所发送请求,那么这个请求会被认为是无效的请求。如果这个请求的controller_epoch值大于内存中的controller_epoch值,那么说明已经有新的控制器当选了。
kafka通过controller_epoch来保证控制器的唯一性,进而保证相关操作的唯一性。
详见优先副本选举
broker.id
broker的编号,如果集群中有个broker,则每个broker的编号需要设置不同
log.dirs
用来配置kafak日志文件存放的根目录。一般使用log.dir用来配置单个根目录,log.dirs用来配置多个根目录(以逗号分隔),但是kafka并没有做强制要求。也就是说log.dir和log.dirs都可以用来配置单个目录或多个目录。log.dirs的优先级比log.dir高
zookeeper.connect
kafka所需要的zookeeper集群地址(包含端口号)
advertised.host.name
弃用,仅在未设置advertide.listeners或listeners时使用