当前位置: 首页 > 知识库问答 >
问题:

合流Kafka Replicator|消费者组偏移未复制到目标集群

闻人举
2023-03-14

更新:

通过从开始:为目标群集上的使用者设置 false 找到了解决方法。不是最好的解决方案,但它的工作原理...

由于某些原因,所有使用者组偏移量都没有复制到目标群集。例如,我在源集群上用消费者组TEST启动消费者(所有消息都被拉入),然后在复制到目标集群的同一主题上启动消费者(使用消费者组TET),结果是两个消费者都获得了所有消息。

我读过关于消费者补偿翻译功能的文章,但是没有成功。

消费者配置:

bootstrap.servers=sourceKafkaEndpoint:9092
interceptor.classes=io.confluent.connect.replicator.offsets.ConsumerTimestampsInterceptor

生产者配置:

bootstrap.servers=destKafkaEndpoint:9092

复制器配置:

connector.class=io.confluent.connect.replicator.ReplicatorSourceConnector
auto.offset.reset=latest
topic.config.sync=false
config.action.reload=none

可执行命令:

replicator --consumer.config consumer.properties --producer.config producer.properties --replication.config replication.properties --topic.regex ".*" --cluster.id replicator

Kafka信息:

Source cluster Kafka version: 2.0.1-cp4
Destination cluster Kafka version: 2.4.1

有什么想法吗?

共有1个答案

鱼渝
2023-03-14

这是一个老问题,我想很多事情已经改变了,但这是为了更好,因为现在复制者被记录得更好了。

正如这里解释的:

即使在topic.regex中匹配,Replicator也不会从源群集复制内部__consumer_offsets或__transaction_state主题。要复制这些主题,请将它们列在topic.whitelist中

这是因为消费者组偏移在集群之间是没有意义的。你不能指望它们是相同的。有关更多详细信息,请参阅我的另一篇文章。
实现此类任务的唯一可靠方法是通过时间戳保存使用偏移转换。它允许根据源集群的消息创建时间将__consumer_offsets转换到目标集群

 类似资料:
  • 试图理解消费者补偿和消费者群体补偿之间的关系。 下面的堆栈溢出链接提供了对消费群体补偿管理的极好理解<什么决定Kafka消费补偿?现在问题来了, 情节: 我们在一个消费者组组1中有消费者(c1)。 偏移值是否将存储在消费者(c1)和组(group1)两个级别?或者如果消费者属于任何消费者组,偏移量将存储在仅消费者组级别? 如果偏移值将存储在两个级别中,它是否是消费者级别偏移值将覆盖消费者组级别偏移

  • 我已经将enable.auto.commit设置为true,并将auto.commit.interval.ms设置为10,000(即10秒)。现在我的问题是--消费者是每个记录的提交偏移量,还是根据10秒内消耗的记录数提交并提前偏移量?

  • 我们已经设置了MirrorMaker来跨两个Kafka集群复制消息。我们还在镜像制造商消费者属性中设置了来复制内部主题。我假设这也将复制,这将反过来同步辅助集群中的消费者组偏移量。 但是,当我们在二级集群中启动消费者组时,它从一开始就开始使用消息,因此看起来消费者组偏移量在二级群集中没有得到复制。 有人能提供一些建议吗?我们如何使用MirrorMaker或任何其他解决方案在辅助集群中同步消费者组偏

  • 我正在使用Kafka2.0版和java消费者API来消费来自一个主题的消息。我们使用的是一个单节点Kafka服务器,每个分区有一个使用者。我注意到消费者正在丢失一些消息。场景是:消费者投票主题。我为每个线程创建了一个消费者。获取消息并将其交给处理程序来处理消息。然后使用“至少一次”的Kafka消费者语义来提交Kafka偏移量来提交偏移量。同时,我有另一个消费者使用不同的group-id运行。在这个

  • 我有一个话题是两个消费群体消费的。题目中有10条留言。 现在我开始应用程序2(消费者组2),它正在消费相同的主题。它不在处理消息。当我描述kafka-consumer-groups(带有--group consumerGroup2)时,它令人惊讶地显示CURRENT-OFFSET=10和LOG-END-OFFSET=10。 理想情况下,这种情况不应该发生,并且kafka应该能够识别对于消费者组2没