5.5 分布式
Consumer Offset Tracking(消费者offset跟踪)
高级别的consumer跟踪每个分区已消费的offset,并定期提交,以便在重启的情况下可以从这些offset中恢复。Kafka提供了一个选项在指定的broker中来存储所有给定的consumer组的offset,称为offset manager。例如,该consumer组的所有consumer实例向offset manager(broker)发送提交和获取offset请求。高级别的consumer将会自动处理这些过程。如果你使用低级别的consumer,你将需要手动管理offset。目前在低级别的java consumer中不支持,只能在Zookeeper中提交或获取offset。如果你使用简单的Scala consumer,将可拿到offset manager,并显式的提交或获取offset。对于包含offset manager的consumer可以通过发送GroupCoordinatorRequest到任意kafka broker,并接受GroupCoordinatorResponse响应,consumer可以继续向`offset manager broker`提交或获取offset。如果offset manager位置变动,consumer需要重新发现offset manager。如果你想手动管理你的offset,你可以看看OffsetCommitRequest 和 OffsetFetchRequest的源码是如何实现的。
当offset manager接收到一个OffsetCommitRequest,它将追加请求到一个特定的压缩名为__consumer_offsets的kafka topic中,当offset topic的所有副本接收offset之后,offset manager将发送一个提交offset成功的响应给consumer。万一offset无法在规定的时间内复制,offset将提交失败,consumer在回退之后可重试该提交(高级别consumer自动进行)。broker会定期压缩offset topic,因为只需要保存每个分区最近的offset。offset manager会缓存offset在内存表中,以便offset快速获取。
当offset manager接收一个offset的获取请求,将从offset缓存中返回最新的的offset。如果offset manager刚启动或新的consumer组刚成为offset manager(成为offset topic分区的leader),则需要加载offset topic的分区到缓存中,在这种情况下,offset将获取失败,并报出OffsetsLoadInProgress异常,consumer回滚后,重试OffsetFetchRequest(高级别consumer自动进行这些操作)。
从ZooKeeper迁移offset到kafka
Kafka consumers在早先的版本中offset默认存储在ZooKeeper中。可以通过下面的步骤迁移这些consumer到Kafka:
- 在consumer配置中设置
offsets.storage=kafka
和dual.commit.enabled=true
。 - consumer做滚动消费,验证你的consumer是健康正常的。
- 在你的consumer配置中设置
dual.commit.enabled=false
。 - consumer做滚动消费,验证你的consumer是健康正常的。
offsets.storage=zookeeper
。ZooKeeper 目录
下面给出了Zookeeper的结构和算法,用于协调consumer和broker。
Notation
当一个path中的元素表示为[XYZ],这意味着xyz的值不是固定的,实际上每个xyz的值可能是Zookeeper的znode,例如`/topic/[topic]`是一个目录,/topic包含一个子目录(每个topic名称)。数字的范围如[0...5]来表示子目录0,1,2,3,4。箭头`->`用于表示znode的内容,例如:/hello->world表示znode /hello包含值”world”。
Broker节点注册
/brokers/ids/[0...N] --> {"jmx_port":...,"timestamp":...,"endpoints":[...],"host":...,"version":...,"port":...} (ephemeral node)
这是当前所有broker的节点列表,其中每个提供了一个唯一的逻辑broker的id标识它的consumer(必须作为配置的一部分)。在启动时,broker节点通过在/brokers/ids/下用逻辑broker id创建一个znode来注册它自己。逻辑broker id的目的是当broker移动到不同的物理机器时,而不会影响消费者。尝试注册一个已存在的broker id时将返回错误(因为2个server配置了相同的broker id)。
由于broker在Zookeeper中用的是临时znode来注册,因此这个注册是动态的,如果broker关闭或宕机,节点将消失(通知consumer不再可用)。
Broker Topic 注册
/brokers/topics/[topic]/partitions/[0...N]/state --> {"controller_epoch":...,"leader":...,"version":...,"leader_epoch":...,"isr":[...]} (ephemeral node)
每个broker在它自己的topic下注册,维护和存储该topic分区的数据。
Consumers and Consumer Groups
topic的consumer也在zookeeper中注册自己,以便相互协调和平衡数据的消耗。consumer也可以通过设置offsets.storage = zookeeper将他们的偏移量存储在zookeeper中。但是,这个偏移存储机制将在未来的版本中被弃用。因此,建议将数据迁移到kafka中。
多个consumer可组成一组,共同消费一个topic,在同一组中的每个consumer共享一个group_id。例如,如果一个consumer是foobar,在三台机器上运行,你可能分配这个这个consumer的ID是“foobar”。这个组id是在consumer的配置文件中配置的。
每个分区正好被一个consumer组的consumer所消费,一组中的consumer尽可能公平地分配分区。
Consumer Id 注册
除了由所有consumer共享的group_id,每个consumer都有一个临时且唯一的consumer_id(主机名的形式:uuid)用于识别。consumer的id在以下目录中注册。
/consumers/[group_id]/ids/[consumer_id] --> {"version":...,"subscription":{...:...},"pattern":...,"timestamp":...} (ephemeral node)组中的每个consumer用consumer_id注册znode。znode的值包含一个map。这个id只是用来识别在组里目前活跃的consumer,这是个临时节点,如果consumer在处理中挂掉,它就会消失。
Consumer Offsets
Consumers track the maximum offset they have consumed in each partition. This value is stored in a ZooKeeper directory if offsets.storage=zookeeper
.
/consumers/[group_id]/offsets/[topic]/[partition_id] --> offset_counter_value (persistent node)
Partition Owner registry
Each broker partition is consumed by a single consumer within a given consumer group. The consumer must establish its ownership of a given partition before any consumption can begin. To establish its ownership, a consumer writes its own id in an ephemeral node under the particular broker partition it is claiming.
/consumers/[group_id]/owners/[topic]/[partition_id] --> consumer_node_id (ephemeral node)
Cluster Id
The cluster id is a unique and immutable identifier assigned to a Kafka cluster. The cluster id can have a maximum of 22 characters and the allowed characters are defined by the regular expression [a-zA-Z0-9_\-]+, which corresponds to the characters used by the URL-safe Base64 variant with no padding. Conceptually, it is auto-generated when a cluster is started for the first time.
Implementation-wise, it is generated when a broker with version 0.10.1 or later is successfully started for the first time. The broker tries to get the cluster id from the /cluster/id
znode during startup. If the znode does not exist, the broker generates a new cluster id and creates the znode with this cluster id.
Broker node registration
The broker nodes are basically independent, so they only publish information about what they have. When a broker joins, it registers itself under the broker node registry directory and writes information about its host name and port. The broker also register the list of existing topics and their logical partitions in the broker topic registry. New topics are registered dynamically when they are created on the broker.
Consumer registration algorithm
When a consumer starts, it does the following:
- Register itself in the consumer id registry under its group.
- Register a watch on changes (new consumers joining or any existing consumers leaving) under the consumer id registry. (Each change triggers rebalancing among all consumers within the group to which the changed consumer belongs.)
- Register a watch on changes (new brokers joining or any existing brokers leaving) under the broker id registry. (Each change triggers rebalancing among all consumers in all consumer groups.)
- If the consumer creates a message stream using a topic filter, it also registers a watch on changes (new topics being added) under the broker topic registry. (Each change will trigger re-evaluation of the available topics to determine which topics are allowed by the topic filter. A new allowed topic will trigger rebalancing among all consumers within the consumer group.)
- Force itself to rebalance within in its consumer group.
Consumer rebalancing algorithm
The consumer rebalancing algorithms allows all the consumers in a group to come into consensus on which consumer is consuming which partitions. Consumer rebalancing is triggered on each addition or removal of both broker nodes and other consumers within the same group. For a given topic and a given consumer group, broker partitions are divided evenly among consumers within the group. A partition is always consumed by a single consumer. This design simplifies the implementation. Had we allowed a partition to be concurrently consumed by multiple consumers, there would be contention on the partition and some kind of locking would be required. If there are more consumers than partitions, some consumers won't get any data at all. During rebalancing, we try to assign partitions to consumers in such a way that reduces the number of broker nodes each consumer has to connect to.
Each consumer does the following during rebalancing:
1. For each topic T that Ci subscribes to2. let PT be all partitions producing topic T3. let CG be all consumers in the same group as Ci that consume topic T4. sort PT (so partitions on the same broker are clustered together)5. sort CG6. let i be the index position of Ci in CG and let N = size(PT)/size(CG)7. assign partitions from i*N to (i+1)*N - 1 to consumer Ci8. remove current entries owned by Ci from the partition owner registry9. add newly assigned partitions to the partition owner registry (we may need to re-try this until the original partition owner releases its ownership)
When rebalancing is triggered at one consumer, rebalancing should be triggered in other consumers within the same group about the same time.