当前位置: 首页 > 工具软件 > OR Broker > 使用案例 >

Broker 和 Controller

仇迪
2023-12-01

1. Broker 和 Controller

对用户而言,broker的主要功能就是持久化消息以及将消息队列中的消息从发送端传输到消费端。Kafka的broker负责持久化producer端发送的消息,同时还为consumer端提供消息。

kafka 从 0.10.x 版本开始支持指定 broker 的机架信息(机架名称)。

1.1. 集群管理

Kafka 支持自动化的服务发现与成员管理。

Kafka 是依赖 zookeeper 实现的。每当一个 broker 启动时,它会将自己注册到 zookeeper 的临时节点:
(1)首先,每个 broker 在 zookeeper 下注册节点的路径是 chroot/brokers/ids/<broker.id>。如果没有配置 chroot,则路径是 /brokers/ids/<broker.id>。是否配置了 chroot 取决于 server.properties 中的 zookeeper.connect 参数是否设置了 chroot。
(2)其次,broker 向 zookeeper 中注册的信息以 json 格式保存。其中包括的信息如下:

{
    "listener_security_protocol_map": {
        "PLAINTEXT": "PLAINTEXT"
    },
    "endpoints": [
        "PLAINTEXT://192.168.110.110:9092"
    ],
    "jmx_port": 9999,
    "host": "192.168.110.110",
    "timestamp": "1575450860777",
    "port": 9092,
    "version": 4
}
  • listener_security_protocol_map:此值指定了该 broker 与外界通信所用的安全协议类型。
    kafka 当前支持 broker 使用不同的安全协议与 clients 和其他 broker 通信。
  • endpoints:指定 broker 的服务 endpoint 列表,每个 endpoints 指明了传输安全协议类型、broker 主机名和端口信息。
  • jmx_port:broker 的 JMX 监控端口。需要在启动 broker 前设置 jmx_port 环境变量。
  • host:broker 主机名或 IP 地址。
  • timestamp:broker 启动时间。
  • port:broker 服务端口号。
  • version:broker 当前版本号。
    注意:这不是 kafka 的版本,而是 broker 注册信息的版本。

Kafka 利用 zookeeper 临时节点来管理 broker 生命周期的。broker 启动时在 zookeeper 中创建对应的临时节点,同时还会创建一个监听器,监听该临时节点的状态,一旦 broker 启动后,监听器会自动同步整个集群信息到 broker 上,而一旦 broker 崩溃,它与 zookeeper 的会话就会失效,导致临时节点被删除,监听器被触发,然后处理 broker 崩溃的后续事宜。

1.2. Controller

在一个 kafka 集群中,某个 broker 会被选举出来承担特殊的角色,即控制器(Controller)。顾明思议,引入 controller 就是用来管理和协调 kafka 集群的。具体来说,就是管理集群中所有分区和副本的状态并执行相应的管理操作

  • 当某个分区的 leader 副本出现故障时,由Controller负责为该分区选举新的 leader 副本;
  • 当检测到某个分区的 ISR 集合发生变化时,由Controller负责通知所有 broker 更新元数据信息;
  • 当使用 kafka-topic.sh 脚本为某个 topic 增加分区数量时,同样还是由Controller负责分区的重新分配。

1.2.1. Controller 选举原理

kafka 中的 controller 选举工作依赖与 zookeeper,成功竞选为 controller 的 broker 会在 zookeeper 中创建 /controller 临时节点,此临时节点的内容参考如下:

{
    "version": 1,
    "brokerid": 2,
    "timestamp": "1569267532839"
}
  • version 在目前版本中固定为1(代码中设置固定值1)
  • brokerid 表示成为控制器的 broker 的 id 编号
  • timestamp 表示竞选成为控制器的时间戳。

当 controller 和 zookeeper 失去连接时,临时节点会删除,而其他 broker 会监听该节点的变化,当节点删除时,其他broker会收到事件通知,重新发起 controller 选举。

每个 kafka 集群任意时刻都只能有一个 controller。每个broker启动的时候会去尝试读取/controller节点的brokerid的值,如果读取到brokerid的值不为-1,则表示已经有其他broker节点成功竞选为controller,所以当前broker就会放弃竞选;如果zookeeper中不存在临时节点,或者这个节点中的数据异常,那么就会尝试去创建/controller节点。当前broker去创建节点的时候,其他broker也有可能在创建这个节点,只有创建成功的那个broker才会成为controller。

每个broker都会在内存中保存当前控制器的brokerid值,这个值可以标识为activeControllerId。当/controller节点的数据发生变化时,每个broker都会更新内存中保存的activeControllerId,如果broker在数据变更前是控制器,在数据变更后自身的brokerid值与新的activeControllerId值不一致,那么在选举前需要“退位”,关闭相应的资源,比如关闭状态机、注销相应的监听器等。

zookeeper中还有一个与控制器有关的/controller_epoch持久节点,节点中存放的是一个整型的controller_epoch值(初始值为1)。controller_epoch用于记录控制器发生变更的次数,即记录当前的控制器是第几代控制器,也可以称为“控制器的纪元”。

每个和控制器交互的请求都会携带controller_epoch这个字段,如果请求的controller_epoch值小于内存中的controller_epoch值,则认为这个请求是向已经过期的控制器所发送请求,那么这个请求会被认为是无效的请求。如果这个请求的controller_epoch值大于内存中的controller_epoch值,那么说明已经有新的控制器当选了。
kafka通过controller_epoch来保证控制器的唯一性,进而保证相关操作的唯一性。

1.2.2. 手动触发 Controller 开启新一轮选举

  • 控制器异常下线,会造成 /controller 这个临时节点被自动删除;
  • 如果有特殊需要,可以手动删除 /controller 节点来触发新一轮的选举;
  • 关闭控制器所对应的 broker;
  • 手动向 /controller 节点写入新的 brokerid 的所对应的数据同样可以触新一轮的选举。

1.2.3. Partition 的 leader 选举

详见优先副本选举

2. broker 端配置

2.1. 最基本的配置

broker.id
broker的编号,如果集群中有个broker,则每个broker的编号需要设置不同
log.dirs
用来配置kafak日志文件存放的根目录。一般使用log.dir用来配置单个根目录,log.dirs用来配置多个根目录(以逗号分隔),但是kafka并没有做强制要求。也就是说log.dir和log.dirs都可以用来配置单个目录或多个目录。log.dirs的优先级比log.dir高
zookeeper.connect
kafka所需要的zookeeper集群地址(包含端口号)

2.2. 主要的参数配置

advertised.host.name
弃用,仅在未设置advertide.listeners或listeners时使用

 类似资料: