当前位置: 首页 > 知识库问答 >
问题:

为什么Kafka镜像制作者目标主题包含一半的原始消息?

诸葛雅达
2023-03-14

我想复制Kafka集群中某个主题的所有消息。所以我运行Kafka Mirrormaker,但是它似乎只复制了大约一半来自源集群的消息(我检查了源主题中没有消费者延迟)。我在源集群中有2个代理,这与此有什么关系吗?

这是源群集配置:

log.retention.ms=1814400000
transaction.state.log.replication.factor=2
offsets.topic.replication.factor=2
auto.create.topics.enable=true
default.replication.factor=2
min.insync.replicas=1
num.io.threads=8
num.network.threads=5
num.partitions=1
num.replica.fetchers=2
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
unclean.leader.election.enable=true
zookeeper.session.timeout.ms=18000

源主题有 4 个分区,并且未压缩。镜像制造商配置是:

  • mirrormaker-consumer.properties
bootstrap.servers=broker1:9092,broker2:9092
group.id=picturesGroup3
auto.offset.reset=earliest
  • mirrormaker-producer.properties
bootstrap.servers=localhost:9092
max.in.flight.requests.per.connection=1
retries=2000000000
acks=all
max.block.ms=2000000000

以下是 Kafdrop 在源集群主题上的统计数据:

这些是镜像制造商运行后目标主题的统计信息:

正如您所看到的,基于大小列,大约只有一半的源消息位于目标主题中。我做错了什么?

共有1个答案

邵博远
2023-03-14

我意识到这个问题的发生是因为我将数据从一个有2个代理的集群复制到一个有1个代理的集群。所以我假设Mirrormaker1只是从原始集群中的一个代理复制了数据。当我将目标集群配置为有2个代理时,所有的消息都被复制到其中。

关于@OneCricketeer使用Mirrormaker2的建议,这也有效,但是我花了一段时间才得到正确的配置文件:

clusters = source, dest

source.bootstrap.servers = sourcebroker1:9092,sourcebroker2:9092
dest.bootstrap.servers = destbroker1:9091,destbroker2:9092
topics = .*
groups = mm2topic
source->dest.enabled = true
offsets.topic.replication.factor=1
offset.storage.replication.factor=1
auto.offset.reset=latest

此外,镜像制造商2可以在此Kafka连接项目的连接容器中找到(进入容器,在/kafka/bin目录中将有 connect-mirror-maker.sh 可执行文件)。

Mirrormaker2解决方案的一个主要缺点是,它将为目标集群中的主题添加前缀(在我的情况下,新名称需要更改应用程序代码)。在Mirrormaker2配置中无法更改前缀,因此唯一的方法是实现自定义Java类,如本文所述。

 类似资料:
  • 按照此处提到的解决方案,kafka-mire-maker-fasting-to-复制-消费者-偏移-主题。我能够跨DC1(Live Kafka集群)和DC2(Backup Kafka集群)集群启动镜像制作器而没有任何错误。 看起来它还能够从DC1集群跨DC2集群同步主题。 问题 如果我关闭 DC1 的使用者并将同一使用者(同一group_id)指向 DC2,即使镜像制造商能够同步本主题和分区的偏移

  • 我使用confluent .net客户端。订阅者在重启(订阅者服务重启)后始终读取 Kafka 主题的所有消息。如何提交消费者已经实现的偏移并从中读取?也许一些消费者配置可以提供帮助...

  • 我有一个主题列表(目前是10个),其大小可以在未来增加。我知道我们可以产生多个线程(每个主题)来消耗每个主题,但在我的例子中,如果主题的数量增加,那么消耗主题的线程数量也会增加,这是我不希望的,因为主题不会太频繁地获取数据,所以线程将是理想的。 有没有办法让单个消费者从所有话题中消费?如果是的话,我们怎样才能做到呢?另外,Kafka将如何维护抵消?请建议答案。

  • 我使用以下代码来读取主题的数据,即“sha-test2”,但它正在读取完全替代的代码行,即 10 行中的 20 行。但是当我运行控制台时,它显示所有 20 行。即.bin/kafka-console-consumer.sh --zookeeper 本地主机:2181 --主题 sha-test2 --从头 我做错了什么?非常感谢您的帮助。

  • 当我运行这个命令时,我得到了两个主题。我知道我创建了测试主题,但我看到了另一个名为“消费者偏移”的主题。从名称来看,这意味着它与消费者补偿有关,但它是如何使用的?

  • 我们有一个服务器,负责处理消息的生成和消费。我们有4台笔记本电脑,所有带有confluent的Mac都运行相同的命令行。。。 /kafka avro控制台使用者--从一开始--引导服务器0.0.0.0:9092,0.0.0.0:9092--主题主题名称--属性schema.registry.url=http://0.0.0.0:8081 4台笔记本电脑中有3台没有问题使用这些消息,但是第四台不会。