4.5 消费者

优质
小牛编辑
137浏览
2023-12-01

Kafka consumer通过向 broker 发出一个“fetch”请求来获取它想要消费的 partition。consumer 的每个请求都在 log 中指定了对应的 offset,并接收从该位置开始的一大块数据。因此,consumer 对于该位置的控制就显得极为重要,并且可以在需要的时候通过回退到该位置再次消费对应的数据。

Push vs. pull

最初我们考虑的问题是:究竟是由 consumer 从 broker 那里 pull 数据,还是由 broker 将数据 push 到 consumer。Kafka 在这方面采取了一种较为传统的设计方式,也是大多数的消息系统所共享的方式:即 producer 把数据 push 到 broker,然后 consumer 从 broker 中 pull 数据。
也有一些 logging-centric 的系统,比如 ScribeApache Flume,沿着一条完全不同的 push-based 的路径,将数据 push 到下游节点。这两种方法都有优缺点。然而,由于 broker 控制着数据传输速率,
所以 push-based 系统很难处理不同的 consumer。让 broker 控制数据传输速率主要是为了让 consumer 能够以可能的最大速率消费;不幸的是,这导致着在 push-based 的系统中,当消费速率低于生产速率时,consumer 往往会不堪重负(本质上类似于拒绝服务攻击)。pull-based 系统有一个很好的特性,
那就是当 consumer 速率落后于 producer 时,可以在适当的时间赶上来。还可以通过使用某种 backoff 协议来减少这种现象:即 consumer 可以通过 backoff 表示它已经不堪重负了,然而通过获得负载情况来充分使用 consumer(但永远不超载)这一方式实现起来比它看起来更棘手。前面以这种方式构建系统的尝试,引导着 Kafka 走向了更传统的 pull 模型。

另一个 pull-based 系统的优点在于:它可以大批量生产要发送给 consumer 的数据。而 push-based 系统必须选择立即发送请求或者积累更多的数据,然后在不知道下游的 consumer 能否立即处理它的情况下发送这些数据。如果系统调整为低延迟状态,这就会导致一次只发送一条消息,以至于传输的数据不再被缓冲,这种方式是极度浪费的。
而 pull-based 的设计修复了该问题,因为 consumer 总是将所有可用的(或者达到配置的最大长度)消息 pull 到 log 当前位置的后面,从而使得数据能够得到最佳的处理而不会引入不必要的延迟。

简单的 pull-based 系统的不足之处在于:如果 broker 中没有数据,consumer 可能会在一个紧密的循环中结束轮询,实际上 busy-waiting 直到数据到来。为了避免 busy-waiting,我们在 pull 请求中加入参数,使得 consumer 在一个“long pull”中阻塞等待,直到数据到来(还可以选择等待给定字节长度的数据来确保传输长度)。

你可以想象其它可能的只基于 pull 的, end-to-end 的设计。例如producer 直接将数据写入一个本地的 log,然后 broker 从 producer 那里 pull 数据,最后 consumer 从 broker 中 pull 数据。通常提到的还有“store-and-forward”式 producer,
这是一种很有趣的设计,但我们觉得它跟我们设定的有数以千计的生产者的应用场景不太相符。我们在运行大规模持久化数据系统方面的经验使我们感觉到,横跨多个应用、涉及数千磁盘的系统事实上并不会让事情更可靠,反而会成为操作时的噩梦。在实践中,
我们发现可以通过大规模运行的带有强大的 SLAs 的 pipeline,而省略 producer 的持久化过程。

消费者的位置

令人惊讶的是,持续追踪已经被消费的内容是消息系统的关键性能点之一。

大多数消息系统都在 broker 上保存被消费消息的元数据。也就是说,当消息被传递给 consumer,broker 要么立即在本地记录该事件,要么等待 consumer 的确认后再记录。这是一种相当直接的选择,而且事实上对于单机服务器来说,也没与其它地方能够存储这些状态信息。
由于大多数消息系统用于存储的数据结构规模都很小,所以这也是一个很实用的选择——因为只要 broker 知道哪些消息被消费了,就可以在本地立即进行删除,一直保持较小的数据量。

也许不太明显,但要让 broker 和 consumer 就被消费的数据保持一致性也不是一个小问题。如果 broker 在每条消息被发送到网络的时候,立即将其标记为 consumed,那么一旦 consumer 无法处理该消息(可能由 consumer 崩溃或者请求超时或者其他原因导致),该消息就会丢失。
为了解决消息丢失的问题,许多消息系统增加了确认机制:即当消息被发送出去的时候,消息仅被标记为sent 而不是 consumed;然后 broker 会等待一个来自 consumer 的特定确认,再将消息标记为consumed。这个策略修复了消息丢失的问题,但也产生了新问题。
首先,如果 consumer 处理了消息但在发送确认之前出错了,那么该消息就会被消费两次。第二个是关于性能的,现在 broker 必须为每条消息保存多个状态(首先对其加锁,确保该消息只被发送一次,然后将其永久的标记为 consumed,以便将其移除)。
还有更棘手的问题要处理,比如如何处理已经发送但一直得不到确认的消息。

Kafka 使用完全不同的方式解决消息丢失问题。Kafka的 topic 被分割成了一组完全有序的 partition,其中每一个 partition 在任意给定的时间内只能被每个订阅了这个 topic 的 consumer 组中的一个 consumer 消费。这意味着 partition 中
每一个 consumer 的位置仅仅是一个数字,即下一条要消费的消息的offset。这使得被消费的消息的状态信息相当少,每个 partition 只需要一个数字。这个状态信息还可以作为周期性的 checkpoint。这以非常低的代价实现了和消息确认机制等同的效果。

这种方式还有一个附加的好处。consumer 可以回退到之前的 offset 来再次消费之前的数据,这个操作违反了队列的基本原则,但事实证明对大多数 consumer 来说这是一个必不可少的特性。
例如,如果 consumer 的代码有 bug,并且在 bug 被发现前已经有一部分数据被消费了, 那么 consumer 可以在 bug 修复后通过回退到之前的 offset 来再次消费这些数据。

离线数据加载

可伸缩的持久化特性允许 consumer 只进行周期性的消费,例如批量数据加载,周期性将数据加载到诸如 Hadoop 和关系型数据库之类的离线系统中。

在 Hadoop 的应用场景中,我们通过将数据加载分配到多个独立的 map 任务来实现并行化,每一个 map 任务负责一个 node/topic/partition,从而达到充分并行化。Hadoop 提供了任务管理机制,失败的任务可以重新启动而不会有重复数据的风险,只需要简单的从原来的位置重启即可。