当前位置: 首页 > 知识库问答 >
问题:

Kafka中的“消费者补偿”这个主题是什么

宣星光
2023-03-14

当我运行这个命令时,我得到了两个主题。我知道我创建了测试主题,但我看到了另一个名为“消费者偏移”的主题。从名称来看,这意味着它与消费者补偿有关,但它是如何使用的?

$bin/kafka主题。sh—列表—zookeeper本地主机:2181 \uu consumer\u Offset test

$ bin/kafka-topics.sh --describe --zookeeper localhost:2181
Topic:__consumer_offsets        PartitionCount:50       ReplicationFactor:1     Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
        Topic: __consumer_offsets       Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: __consumer_offsets       Partition: 1    Leader: 0       Replicas: 0     Isr: 0
                      *
                      *
                      *
        Topic: __consumer_offsets       Partition: 48    Leader: 0       Replicas: 0     Isr: 0
        Topic: __consumer_offsets       Partition: 49    Leader: 0       Replicas: 0     Isr: 0

这在Kafka 1.1.0中发生,为什么有50个分区。也在寻找一种禁用此功能的方法,因为每次我尝试运行“描述”主题时,它都会首先打印\uu consumer\u偏移量的50个分区,然后打印我的主题。


共有3个答案

柯伟志
2023-03-14

消费者使用主题__consumer_offsets存储他们读取的消息偏移量。当消费者重新启动时,它将读取它在停机和处理下一个偏移量之前消耗的最后一个位置。

@cricket_007是对的,你可以在Kafka中默认复制,这是使用的至少一次语义学。

焦同
2023-03-14

消费者根据消费者组id将上次消费的消息偏移id存储在Kafka主题中。这使不同的消费者(显然具有不同的消费者id)能够在上次消费的消息之后处理下一条消息,并避免重复的消息处理。

吕华彩
2023-03-14

在Kafka的最初版本中,偏移是由zookeeper管理的,但Kafka随着时间的推移不断发展,引入了许多新功能。现在,Kafka在内部/系统级主题中管理偏移,即_uconsumer\u偏移。

每当您创建一个主题而没有显式指定分区数时,Kafka最终会默认为该主题创建50个分区。这同样暗示了主题__consumer_offsets。

 类似资料:
  • 我正在使用Kafka 0.8 最近,我们开始喂食和消耗一个行为怪异的新主题,消耗的偏移量突然被重置,它尊重我们设置的auto.offset.reset策略(实际上是最小的)但我无法理解为什么该主题会突然重置其偏移量。 我正在使用高级消费者。 这是我发现的一些错误日志: 我们有一堆这样的错误日志: 每次出现此问题时,我都会看到警告日志: 然后真正的问题发生了: 现在的问题是:有人已经经历过这种行为吗

  • 我设置了MirrorMaker2,用于在两个DC之间复制数据。 我的 mm2 属性, 看到下面的MM2创业。 我的数据正在按预期进行复制。源主题作为源在目标集群中创建..但是,消费者群体补偿并没有被复制。 已在源群集中启动使用者组。 消耗了少量消息并将其停止。在此主题中发布了新消息,镜像制造商也将数据镜像到目标集群。 我尝试使用来自目标集群的消息,如下所示。 由于我使用相同的使用者组,因此我希望我

  • 问题内容: 我对Kafka比较陌生。我已经做了一些实验,但是对于消费者补偿我有些不清楚。根据到目前为止的了解,使用方启动时,将从其读取的偏移量由配置设置确定(如果我输入错了,请更正我)。 现在说,例如,该主题中有10条消息(偏移量0到9),一个消费者在崩溃之前(或我杀死该消费者之前)碰巧消耗了其中的5条消息。然后说我重新启动该使用者进程。我的问题是: 如果将设置为,它是否总是从偏移量0开始消耗?

  • 我在一个线程中创建了一个Kafka consumer实例,作为构造函数的一部分,在thread inside run方法中,我确实调用了不同的web服务,为了保持调用的非阻塞性,我正在使用completable future。我的问题是,我无法通过调用thenApply方法并传递Kafka consumer实例来发出commit,因为这会给我一个错误,即Kafka consumer不是线程安全的。

  • 我有一个主题列表(目前是10个),其大小可以在未来增加。我知道我们可以产生多个线程(每个主题)来消耗每个主题,但在我的例子中,如果主题的数量增加,那么消耗主题的线程数量也会增加,这是我不希望的,因为主题不会太频繁地获取数据,所以线程将是理想的。 有没有办法让单个消费者从所有话题中消费?如果是的话,我们怎样才能做到呢?另外,Kafka将如何维护抵消?请建议答案。

  • 我有两个组id相同的消费者服务器订阅了相同的主题。kafka服务器仅使用一个分区运行。据我所知,消息应该在这两个消费者服务器中随机使用。但现在似乎总是同一个消费者服务器A消费消息,另一个不消费消息。如果我停止消费者服务器A,另一个将正常工作。我所期望的是,他们可以随机消费信息。