当我运行这个命令时,我得到了两个主题。我知道我创建了测试主题,但我看到了另一个名为“消费者偏移”的主题。从名称来看,这意味着它与消费者补偿有关,但它是如何使用的?
$bin/kafka主题。sh—列表—zookeeper本地主机:2181 \uu consumer\u Offset test
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181
Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:1 Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
Topic: __consumer_offsets Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: __consumer_offsets Partition: 1 Leader: 0 Replicas: 0 Isr: 0
*
*
*
Topic: __consumer_offsets Partition: 48 Leader: 0 Replicas: 0 Isr: 0
Topic: __consumer_offsets Partition: 49 Leader: 0 Replicas: 0 Isr: 0
这在Kafka 1.1.0中发生,为什么有50个分区。也在寻找一种禁用此功能的方法,因为每次我尝试运行“描述”主题时,它都会首先打印\uu consumer\u偏移量的50个分区,然后打印我的主题。
消费者使用主题__consumer_offsets
来存储他们读取的消息偏移量。当消费者重新启动时,它将读取它在停机和处理下一个偏移量之前消耗的最后一个位置。
@cricket_007是对的,你可以在Kafka中默认复制,这是使用的至少一次语义学。
消费者根据消费者组id将上次消费的消息偏移id存储在Kafka主题中。这使不同的消费者(显然具有不同的消费者id)能够在上次消费的消息之后处理下一条消息,并避免重复的消息处理。
在Kafka的最初版本中,偏移是由zookeeper管理的,但Kafka随着时间的推移不断发展,引入了许多新功能。现在,Kafka在内部/系统级主题中管理偏移,即_uconsumer\u偏移。
每当您创建一个主题而没有显式指定分区数时,Kafka最终会默认为该主题创建50个分区。这同样暗示了主题__consumer_offsets。
我正在使用Kafka 0.8 最近,我们开始喂食和消耗一个行为怪异的新主题,消耗的偏移量突然被重置,它尊重我们设置的auto.offset.reset策略(实际上是最小的)但我无法理解为什么该主题会突然重置其偏移量。 我正在使用高级消费者。 这是我发现的一些错误日志: 我们有一堆这样的错误日志: 每次出现此问题时,我都会看到警告日志: 然后真正的问题发生了: 现在的问题是:有人已经经历过这种行为吗
我设置了MirrorMaker2,用于在两个DC之间复制数据。 我的 mm2 属性, 看到下面的MM2创业。 我的数据正在按预期进行复制。源主题作为源在目标集群中创建..但是,消费者群体补偿并没有被复制。 已在源群集中启动使用者组。 消耗了少量消息并将其停止。在此主题中发布了新消息,镜像制造商也将数据镜像到目标集群。 我尝试使用来自目标集群的消息,如下所示。 由于我使用相同的使用者组,因此我希望我
问题内容: 我对Kafka比较陌生。我已经做了一些实验,但是对于消费者补偿我有些不清楚。根据到目前为止的了解,使用方启动时,将从其读取的偏移量由配置设置确定(如果我输入错了,请更正我)。 现在说,例如,该主题中有10条消息(偏移量0到9),一个消费者在崩溃之前(或我杀死该消费者之前)碰巧消耗了其中的5条消息。然后说我重新启动该使用者进程。我的问题是: 如果将设置为,它是否总是从偏移量0开始消耗?
我在一个线程中创建了一个Kafka consumer实例,作为构造函数的一部分,在thread inside run方法中,我确实调用了不同的web服务,为了保持调用的非阻塞性,我正在使用completable future。我的问题是,我无法通过调用thenApply方法并传递Kafka consumer实例来发出commit,因为这会给我一个错误,即Kafka consumer不是线程安全的。
我有一个主题列表(目前是10个),其大小可以在未来增加。我知道我们可以产生多个线程(每个主题)来消耗每个主题,但在我的例子中,如果主题的数量增加,那么消耗主题的线程数量也会增加,这是我不希望的,因为主题不会太频繁地获取数据,所以线程将是理想的。 有没有办法让单个消费者从所有话题中消费?如果是的话,我们怎样才能做到呢?另外,Kafka将如何维护抵消?请建议答案。
我有两个组id相同的消费者服务器订阅了相同的主题。kafka服务器仅使用一个分区运行。据我所知,消息应该在这两个消费者服务器中随机使用。但现在似乎总是同一个消费者服务器A消费消息,另一个不消费消息。如果我停止消费者服务器A,另一个将正常工作。我所期望的是,他们可以随机消费信息。