4.7 Replication
Kafka 允许 topic 的 partition 拥有若干副本,你可以在server端配置partition 的副本数量。当集群中的节点出现故障时,能自动进行故障转移,保证数据的可用性。
其他的消息系统也提供了副本相关的特性,但是在我们(带有偏见)看来,他们的副本功能不常用,而且有很大缺点:slaves 处于非活动状态,导致吞吐量受到严重影响,并且还要手动配置副本机制。Kafka 默认使用备份机制,事实上,我们将没有设置副本数
的 topic 实现为副本数为1的 topic 。
创建副本的单位是 topic 的 partition ,正常情况下, 每个分区都有一个 leader 和零或多个 followers 。 总的副本数是包含 leader 的总和。 所有的读写操作都由 leader 处理,一般 partition 的数量都比 broker 的数量多的多,各分区的 leader 均
匀的分布在brokers 中。所有的 followers 节点都同步 leader 节点的日志,日志中的消息和偏移量都和 leader 中的一致。(当然, 在任何给定时间, leader 节点的日志末尾时可能有几个消息尚未被备份完成)。
Followers 节点就像普通的 consumer 那样从 leader 节点那里拉取消息并保存在自己的日志文件中。Followers 节点可以从 leader 节点那里批量拉取消息日志到自己的日志文件中。
与大多数分布式系统一样,自动处理故障需要精确定义节点 “alive” 的概念。Kafka 判断节点是否存活有两种方式。
- 节点必须可以维护和 ZooKeeper 的连接,Zookeeper 通过心跳机制检查每个节点的连接。
- 如果节点是个 follower ,它必须能及时的同步 leader 的写操作,并且延时不能太久。
我们认为满足这两个条件的节点处于 “in sync” 状态,区别于 “alive” 和 “failed” 。 Leader会追踪所有 “in sync” 的节点。如果有节点挂掉了, 或是写超时, 或是心跳超时, leader 就会把它从同步副本列表中移除。
同步超时和写超时的时间由 replica.lag.time.max.ms 配置确定。
分布式系统中,我们只尝试处理 “fail/recover” 模式的故障,即节点突然停止工作,然后又恢复(节点可能不知道自己曾经挂掉)的状况。Kafka 没有处理所谓的 “Byzantine” 故障,即一个节点出现了随意响应和恶意响应(可能由于 bug 或 非法操作导致)。
现在, 我们可以更精确地定义, 只有当消息被所有的副本节点加入到日志中时, 才算是提交, 只有提交的消息才会被 consumer 消费, 这样就不用担心一旦 leader 挂掉了消息会丢失。另一方面, producer 也
可以选择是否等待消息被提交,这取决他们的设置在延迟时间和持久性之间的权衡,这个选项是由 producer 使用的 acks 设置控制。
请注意,Topic 可以设置同步备份的最小数量, producer 请求确认消息是否被写入到所有的备份时, 可以用最小同步数量判断。如果 producer 对同步的备份数没有严格的要求,即使同步的备份数量低于
最小同步数量(例如,仅仅只有 leader 同步了数据),消息也会被提交,然后被消费。
在所有时间里,Kafka 保证只要有至少一个同步中的节点存活,提交的消息就不会丢失。
节点挂掉后,经过短暂的故障转移后,Kafka将仍然保持可用性,但在网络分区( network partitions )的情况下可能不能保持可用性。
备份日志:Quorums, ISRs, 和状态机
Kafka的核心是备份日志文件。备份日志文件是分布式数据系统最基础的要素之一,实现方法也有很多种。其他系统也可以用 kafka 的备份日志模块来实现状态机风格的分布式系统
备份日志按照一系列有序的值(通常是编号为0、1、2、…)进行建模。有很多方法可以实现这一点,但最简单和最快的方法是由 leader 节点选择需要提供的有序的值,只要 leader 节点还存活,所有的 follower 只需要拷贝数据并按照 leader 节点的顺序排序。
当然,如果 leader 永远都不会挂掉,那我们就不需要 follower 了。 但是如果 leader crash,我们就需要从 follower 中选举出一个新的 leader。 但是 followers 自身也有可能落后或者 crash,所以 我们必须确保我们leader的候选者们 是一个数据同步
最新的 follower 节点。
如果选择写入时候需要保证一定数量的副本写入成功,读取时需要保证读取一定数量的副本,读取和写入之间有重叠。这样的读写机制称为 Quorum。
这种权衡的一种常见方法是对提交决策和 leader 选举使用多数投票机制。Kafka 没有采取这种方式,但是我们还是要研究一下这种投票机制,来理解其中蕴含的权衡。假设我们有2f + 1个副本,如果在 leader 宣布消息提交之前必须有f+1个副本收到
该消息,并且如果我们从这至少f+1个副本之中,有着最完整的日志记录的 follower 里来选择一个新的 leader,那么在故障次数少于f的情况下,选举出的 leader 保证具有所有提交的消息。这是因为在任意f+1个副本中,至少有一个副本一定包含
了所有提交的消息。该副本的日志将是最完整的,因此将被选为新的 leader。这个算法都必须处理许多其他细节(例如精确定义怎样使日志更加完整,确保在 leader down 掉期间, 保证日志一致性或者副本服务器的副本集的改变),但是现在我们将忽略这些细节。
这种大多数投票方法有一个非常好的优点:延迟是取决于最快的服务器。也就是说,如果副本数是3,则备份完成的等待时间取决于最快的 Follwer 。
这里有很多分布式算法,包含 ZooKeeper 的
Zab,
Raft,
和 Viewstamped Replication.
我们所知道的与 Kafka 实际执行情况最相似的学术刊物是来自微软的
PacificA
大多数投票的缺点是,多数的节点挂掉让你不能选择 leader。要冗余单点故障需要三份数据,并且要冗余两个故障需要五份的数据。根据我们的经验,在一个系统中,仅仅靠冗余来避免单点故障是不够的,但是每写5次,对磁盘空间需求是5倍,
吞吐量下降到 1/5,这对于处理海量数据问题是不切实际的。这可能是为什么 quorum 算法更常用于共享集群配置(如 ZooKeeper ), 而不适用于原始数据存储的原因,例如 HDFS 中 namenode 的高可用是建立在
基于投票的元数据 ,这种代价高昂的存储方式不适用数据本身。
Kafka 采取了一种稍微不同的方法来选择它的投票集。 Kafka 不是用大多数投票选择 leader 。Kafka 动态维护了一个同步状态的备份的集合 (a set of in-sync replicas), 简称 ISR ,在这个集合中的节点都是和 leader 保持高度一致的,只有这个集合的成员才
有资格被选举为 leader,一条消息必须被这个集合 所有 节点读取并追加到日志中了,这条消息才能视为提交。这个 ISR 集合发生变化会在 ZooKeeper 持久化,正因为如此,这个集合中的任何一个节点都有资格被选为 leader 。这对于 Kafka 使用模型中,
有很多分区和并确保主从关系是很重要的。因为 ISR 模型和 f+1 副本,一个 Kafka topic 冗余 f 个节点故障而不会丢失任何已经提交的消息。
我们认为对于希望处理的大多数场景这种策略是合理的。在实际中,为了冗余 f 节点故障,大多数投票和 ISR 都会在提交消息前确认相同数量的备份被收到(例如在一次故障生存之后,大多数的 quorum
需要三个备份节点和一次确认,ISR 只需要两个备份节点和一次确认),多数投票方法的一个优点是提交时能避免最慢的服务器。但是,我们认为通过允许客户端选择是否阻塞消息提交来改善,和所需的备份数较低而产生的额外的吞吐量和磁盘空间是值得的。
另一个重要的设计区别是,Kafka 不要求崩溃的节点恢复所有的数据,在这种空间中的复制算法经常依赖于存在 “稳定存储”,在没有违反潜在的一致性的情况下,出现任何故障再恢复情况下都不会丢失。
这个假设有两个主要的问题。首先,我们在持久性数据系统的实际操作中观察到的最常见的问题是磁盘错误,并且它们通常不能保证数据的完整性。其次,即使磁盘错误不是问题,我们也不希望在每次写入时都要求使用 fsync 来保证一致性,
因为这会使性能降低两到三个数量级。我们的协议能确保备份节点重新加入ISR 之前,即使它挂时没有新的数据, 它也必须完整再一次同步数据。
Unclean leader 选举: 如果节点全挂了?
请注意,Kafka 对于数据不会丢失的保证,是基于至少一个节点在保持同步状态,一旦分区上的所有备份节点都挂了,就无法保证了。
但是,实际在运行的系统需要去考虑假设一旦所有的备份都挂了,怎么去保证数据不会丢失,这里有两种实现的方法
- 等待一个 ISR 的副本重新恢复正常服务,并选择这个副本作为领 leader (它有极大可能拥有全部数据)。
- 选择第一个重新恢复正常服务的副本(不一定是 ISR 中的)作为leader。
这是可用性和一致性之间的简单妥协,如果我只等待 ISR 的备份节点,那么只要 ISR 备份节点都挂了,我们的服务将一直会不可用,如果它们的数据损坏了或者丢失了,那就会是长久的宕机。另一方面,如果不是 ISR 中的节点恢复服务并且我们允许它成为 leader ,
那么它的数据就是可信的来源,即使它不能保证记录了每一个已经提交的消息。 kafka 默认选择第二种策略,当所有的 ISR 副本都挂掉时,会选择一个可能不同步的备份作为 leader ,可以配置属性 unclean.leader.election.enable 禁用此策略,那么就会使用第
一种策略即停机时间优于不同步。
这种困境不只有 Kafka 遇到,它存在于任何 quorum-based 规则中。例如,在大多数投票算法当中,如果大多数服务器永久性的挂了,那么您要么选择丢失100%的数据,要么违背数据的一致性选择一个存活的服务器作为数据可信的来源。
可用性和持久性保证
向 Kafka 写数据时,producers 设置 ack 是否提交完成, 0:不等待broker返回确认消息,1: leader保存成功返回或, -1(all): 所有备份都保存成功返回.请注意. 设置 “ack = all” 并不能保证所有的副本都写入了消息。默认情况下,当 acks = all 时,只要
ISR 副本同步完成,就会返回消息已经写入。例如,一个 topic 仅仅设置了两个副本,那么只有一个 ISR 副本,那么当设置acks = all时返回写入成功时,剩下了的那个副本数据也可能数据没有写入。
尽管这确保了分区的最大可用性,但是对于偏好数据持久性而不是可用性的一些用户,可能不想用这种策略,因此,我们提供了两个topic 配置,可用于优先配置消息数据持久性:
- 禁用 unclean leader 选举机制 - 如果所有的备份节点都挂了,分区数据就会不可用,直到最近的 leader 恢复正常。这种策略优先于数据丢失的风险, 参看上一节的 unclean leader 选举机制。
- 指定最小的 ISR 集合大小,只有当 ISR 的大小大于最小值,分区才能接受写入操作,以防止仅写入单个备份的消息丢失造成消息不可用的情况,这个设置只有在生产者使用 acks = all 的情况下才会生效,这至少保证消息被 ISR 副本写入。此设置是一致性和可用性之间的折衷,对于设置更大的最小ISR大小保证了更好的一致性,因为它保证将消息被写入了更多的备份,减少了消息丢失的可能性。但是,这会降低可用性,因为如果 ISR 副本的数量低于最小阈值,那么分区将无法写入。
备份管理
以上关于备份日志的讨论只涉及单个日志文件,即一个 topic 分区,事实上,一个Kafka集群管理着成百上千个这样的 partitions。我们尝试以轮询调度的方式将集群内的 partition 负载均衡,避免大量topic拥有的分区集中在
少数几个节点上。同样,我们也试图平衡leadership,以至于每个节点都是部分 partition 的leader节点。
优化主从关系的选举过程也是重要的,这是数据不可用的关键窗口。原始的实现是当有节点挂了后,进行主从关系选举时,会对挂掉节点的所有partition 的领导权重新选举。相反,我们会选择一个 broker 作为 “controller”节点。controller 节点负责
检测 brokers 级别故障,并负责在 broker 故障的情况下更改这个故障 Broker 中的 partition 的 leadership 。这种方式可以批量的通知主从关系的变化,使得对于拥有大量partition 的broker ,选举过程的代价更低并且速度更快。如果 controller 节点挂了,其他
存活的 broker 都可能成为新的 controller 节点。