当前位置: 首页 > 知识库问答 >
问题:

Kafka MirrorMaker2 -不反映消费者群体补偿

云凌
2023-03-14

我设置了MirrorMaker2,用于在两个DC之间复制数据。

我的 mm2 属性,

# mm2.properties
name=source->dest
clusters=source, dest

source.bootstrap.servers=localhost:9091
dest.bootstrap.servers=localhost:9092

source->dest.enabled=true

offset.storage.partitions=2
config.storage.replication.factor=1
status.storage.replication.factor=1

看到下面的MM2创业。

[2020-02-16 07:31:07,547] INFO MirrorConnectorConfig values: 
    admin.timeout.ms = 60000
    checkpoints.topic.replication.factor = 3
    config.action.reload = restart
    config.properties.blacklist = [follower\.replication\.throttled\.replicas, leader\.replication\.throttled\.replicas, message\.timestamp\.difference\.max\.ms, message\.timestamp\.type, unclean\.leader\.election\.enable, min\.insync\.replicas]
    config.property.filter.class = class org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter
    connector.class = org.apache.kafka.connect.mirror.MirrorCheckpointConnector
    consumer.poll.timeout.ms = 1000
    emit.checkpoints.enabled = true
    emit.checkpoints.interval.seconds = 60
    emit.heartbeats.enabled = true
    emit.heartbeats.interval.seconds = 1
    enabled = true
    errors.log.enable = false
    errors.log.include.messages = false
    errors.retry.delay.max.ms = 60000
    errors.retry.timeout = 0
    errors.tolerance = none
    group.filter.class = class org.apache.kafka.connect.mirror.DefaultGroupFilter
    groups = [.*]
    groups.blacklist = [console-consumer-.*, connect-.*, __.*]
    header.converter = null
    heartbeats.topic.replication.factor = 3
    key.converter = null
    metric.reporters = null
    name = source->dest
    offset-syncs.topic.replication.factor = 3
    offset.lag.max = 100
    refresh.groups.enabled = true
    refresh.groups.interval.seconds = 600
    refresh.topics.enabled = true
    refresh.topics.interval.seconds = 600
    replication.factor = 2
    replication.policy.class = class org.apache.kafka.connect.mirror.DefaultReplicationPolicy
    replication.policy.separator = .
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    source.cluster.alias = source
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    sync.topic.acls.enabled = true
    sync.topic.acls.interval.seconds = 600
    sync.topic.configs.enabled = true
    sync.topic.configs.interval.seconds = 600
    target.cluster.alias = dest
    task.assigned.groups = null
    task.assigned.partitions = null
    tasks.max = 1
    topic.filter.class = class org.apache.kafka.connect.mirror.DefaultTopicFilter
    topics = [.*]
    topics.blacklist = [.*[\-\.]internal, .*\.replica, __.*]
    transforms = []
    value.converter = null
 (org.apache.kafka.connect.mirror.MirrorConnectorConfig:347)

我的数据正在按预期进行复制。源主题作为源在目标集群中创建..但是,消费者群体补偿并没有被复制。

已在源群集中启动使用者组。

./kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic test-1 --group test-1-group

消耗了少量消息并将其停止。在此主题中发布了新消息,镜像制造商也将数据镜像到目标集群。

我尝试使用来自目标集群的消息,如下所示。

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic source.test-1 --group test-1-group

由于我使用相同的使用者组,因此我希望我的偏移量也会同步,并且不会使用我在 cluster1 中使用的相同消息。但是,仍然会消耗所有消息。我在这里错过了什么吗?

共有3个答案

卫昊东
2023-03-14

我的数据正在按预期进行复制。源主题作为源在目标集群中创建..但是,消费者群体补偿并没有被复制。

默认情况下,MM2不会从kafka控制台消费者复制消费者组。在MM2启动日志中,我们可以看到组。黑名单=[控制台消费者-.*,连接-.*、__.*]。我相信您可以在mm2.properties配置文件中覆盖此项。

因为我使用相同的消费者组,所以我希望我的偏移量也会同步,并且不会使用我在集群1中使用的相同消息。

一旦正确镜像了使用者组并启用了检查点,应该会在目标集群中自动创建一个内部主题(类似于< code > dest . check points . internal )。此检查点主题包含每个使用者组中镜像主题分区的源集群和目标集群中最后提交的偏移量。

然后,您可以使用Kafka的RemoteClusterUtils实用程序类转换这些偏移量,并获取source的同步偏移量。test-1映射到消费者上次提交的test-1的偏移量。如果最终使用Java创建消费者,可以将RemoteClusterUtils作为依赖项添加到项目中:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>connect-mirror-client</artifactId>
    <version>2.4.0</version>
</dependency>

否则,您可能必须编写一个包装远程群集Utils的工具.java以获取已翻译的偏移量。此功能或类似功能似乎计划作为MM2未来版本的一部分。

商焕
2023-03-14

Kafka 2.7引入了“自动化消费抵消同步”。默认情况下,消费者补偿不会在集群之间同步。您应该显式启用该功能。

在MM 2.0中支持跨集群的自动消费偏移同步

苏乐童
2023-03-14

复制偏移量非常重要的原因有几个:

    < li >Kafka是一个至少一次的系统(忽略炒作)。这意味着mirror maker,因为它建立在kafka消费者和生产者之上,每个消费者和生产者都可以超时/断开连接,将导致某种程度的重复记录被传送到目的地。这意味着偏移量在源和目标之间不是1:1的映射。即使您尝试使用“恰好一次”支持(MM2 KIP明确表示没有使用),它所做的只是跳过部分交付的批次,但是这些批次仍然会占用目的地的偏移量 < li >如果在源主题开始过期记录后很久才设置镜像,则目标主题将从偏移量0开始,而源主题将具有更高的“最旧”偏移量。已经尝试解决这个问题(参见KIP-391 ),但从未被接受 < li >一般来说,不能保证您的镜像拓扑从单个源镜像到单个目的地。例如,linkedin拓扑从多个源集群镜像到“聚合”层集群。对于这种拓扑来说,映射偏移是没有意义的

在MM2 KIP中,提到了一个“偏移同步主题”。在代码中,可以使用类RemoteClusterUtils在群集之间转换检查点:

Map<TopicPartition, Long> newOffsets = RemoteClusterUtils.translateOffsets(
   newClusterProperties, oldClusterName, consumerGroupId
);
consumer.seek(newOffsets);

这摘自以下演示文稿-https://www . slide share . net/confluent Inc/disaster-recovery-with-mirror maker-20-ryanne-Dolan-cloud era-Kafka-summit-London-2019

或者,您可以使用 seek by timespamp API 在目标上启动您的使用者组,直到数据传递到目标的粗略时间(或传递到源,如果日志的代理设置在目标上附加时间戳不会覆盖这些时间)。为了安全起见,您需要倒带一点。

 类似资料:
  • 我正在阅读Kafka常见问题解答,他们如下所示。 •每个分区不会被每个使用者组中的多个使用者线程/进程使用。这允许每个进程以单线程方式使用,以保证分区内的使用者的顺序(如果我们将有序消息分割成一个分区并将它们传递给多个使用者,即使这些消息是按顺序存储的,它们有时也会被无序地处理)。 有没有可能,

  • 我在使用Kafka时遇到了一些问题。非常感谢任何帮助!我在docker swell中分别有zookeeper和kafka集群3个节点。您可以在下面看到Kafka代理配置。 我的情况: < li > 20x位制片人不断向Kafka主题传达信息 < li>1x消费者读取和记录消息 < li >终止kafka节点(docker容器停止),因此现在群集有2个Kafka代理节点(第3个节点将自动启动并加入群

  • 我是Kafka的新手,正在学习Kafka内部知识。请根据需要随时更正我的理解。。 这是我的实时场景..感谢所有的回复: 我有一个接收数据文件的实时FTP服务器…比如索赔文件。 我将把这些数据发布到一个主题中.让我们把这个主题称为claims_topic(2个分区). 我需要订阅这个claims_topic,阅读消息并将它们写入Oracle和Postgres表。让我们将oracle表称为Otable

  • 我是Kafka的新手。我看了一眼Kafka文档。似乎分派给订阅消费者组的消息是通过将分区与消费者实例绑定来实现的。 在使用Apache Kafka时,我们应该记住一件重要的事情,即同一消费者组中的消费者数量应该小于或等于所使用主题中的分区数量。否则,将不会收到来自主题的任何消息。 在非prod环境中,我没有配置主题分区。在这种情况下,Kafka是否只有一个分区。如果我启动共享同一组的多个消费者并向

  • null 编辑:好的,所以我取得了一些进步(如果我错了请纠正我): 每个消费者都将获得所有消息。 租约被分配了一个EventProcessorHost,所以它需要一个唯一的名称,所以这里的使用者组名称实际上并不相关。 仍然不能百分之百确定context.checkpointasync,但我相信它只适用于ConsumerGroup?

  • 我正在使用镜像制作器 2 进行灾难恢复。 Kafka 2.7 应支持自动消费者偏移同步 下面是我正在使用的yaml文件(我使用strimzi来创建它) 所有源群集主题都在目标群集中复制。还有…检查点。内部主题是在包含所有同步的源集群偏移量的目标集群中创建的,但我没有看到这些偏移量被转换为目标集群_consumer_offsets主题,这意味着当我在目标集群中启动消费者(同一消费者组)时,它将从一开