一个测试环境的kafka集群,Topic有360+,Partition有2000+,部署在虚拟机上,由于多方面原因,要求迁移至k8s容器内(全量迁移),正好可以拿来练一下手。本文主要记录对MM1和MM2的实际操作过程,以及使用过程中遇到的问题及解决方案。
source集群:kafka-2.6.0、2个broker、虚拟机
target集群:kafka-2.6.0、3个broker、k8s
工具:MM1(kafka-mirror-maker.sh)、MM2(connect-mirror-maker.sh)
需求:Topic名称不能改变、数据完整
条件:target集群需要开启自动创建Topic:auto.create.topics.enable=true
本质上MM1是Kafka的消费者和生产者结合体,可以有效地将数据从源群集移动到目标群集,但没有提供太多其他功能。
并且在MM1多年的使用过程中发现了以下局限性:
MM2是基于kafka connect框架开发的。与其它的kafka connecet一样MM2有source connector和sink connetor组成,可以支持同步以下数据:
秉着实操前先演练的原则,我自己搭建了一个和目标集群相同配置的集群,用于验证不同工具的操作结果。有足够把握之后,再对目标集群实际操作。
执行 --help 查看参数选项:
[root@XXGL-T-TJSYZ-REDIS-03 bin]# ./kafka-mirror-maker.sh --help
This tool helps to continuously copy data between two Kafka clusters.
Option Description
------ -----------
--abort.on.send.failure <String: Stop Configure the mirror maker to exit on
the entire mirror maker when a send a failed send. (default: true)
failure occurs>
--consumer.config <String: config file> Embedded consumer config for consuming
from the source cluster.
--consumer.rebalance.listener <String: The consumer rebalance listener to use
A custom rebalance listener of type for mirror maker consumer.
ConsumerRebalanceListener>
--help Print usage information.
--message.handler <String: A custom Message handler which will process
message handler of type every record in-between consumer and
MirrorMakerMessageHandler> producer.
--message.handler.args <String: Arguments used by custom message
Arguments passed to message handler handler for mirror maker.
constructor.>
--new.consumer DEPRECATED Use new consumer in mirror
maker (this is the default so this
option will be removed in a future
version).
--num.streams <Integer: Number of Number of consumption streams.
threads> (default: 1)
--offset.commit.interval.ms <Integer: Offset commit interval in ms.
offset commit interval in (default: 60000)
millisecond>
--producer.config <String: config file> Embedded producer config.
--rebalance.listener.args <String: Arguments used by custom rebalance
Arguments passed to custom rebalance listener for mirror maker consumer.
listener constructor as a string.>
--version Display Kafka version.
--whitelist <String: Java regex Whitelist of topics to mirror.
(String)>
[root@XXGL-T-TJSYZ-REDIS-03 bin]#
核心参数就两个:消费者和生产者的配置文件:
consumer.properties:(消费source集群)
bootstrap.servers=source:9092
auto.offset.reset=earliest
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
group.id=mm1-consumer
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
producer.properties:(发送消息至目标集群)
bootstrap.servers= target:29092
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="Admin" password="hMOPbmZE";
acks=-1
linger.ms=10
batch.size=10000
retries=3
执行脚本:
./kafka-mirror-maker.sh --consumer.config ./consumer.properties --producer.config ./producer.properties --offset.commit.interval.ms 5000 --num.streams 2 --whitelist "projects.*"
MM1比较简单,只要两个配置文件没问题,sasl配置正确,基本就OK了,适合简单的数据同步,比如指定topic进行同步。
有四种运行MM2的方法:
本文介绍第一种和第三种:作为专用的MirrorMaker群集、作为独立的Connect工作者,第二种需要搭建connect集群,操作比较复杂。
这种模式是最简单的,只需要提供一个配置文件即可,配置文件定制化程度比较高,根据业务需求配置即可
老样子,执行 --help 看看使用说明:
[root@XXGL-T-TJSYZ-REDIS-03 bin]# ./connect-mirror-maker.sh --help
usage: connect-mirror-maker [-h] [--clusters CLUSTER [CLUSTER ...]] mm2.properties
MirrorMaker 2.0 driver
positional arguments:
mm2.properties MM2 configuration file.
optional arguments:
-h, --help show this help message and exit
--clusters CLUSTER [CLUSTER ...]
Target cluster to use for this node.
[root@XXGL-T-TJSYZ-REDIS-03 bin]#
可以看到,参数简单了许多,核心参数就一个配置文件。
mm2.properties:
name = event-center-connector
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 2
# 定义集群别名
clusters = event-center, event-center-new
# 设置event-center集群的kafka地址列表
event-center.bootstrap.servers = source:9193
event-center.security.protocol=SASL_PLAINTEXT
event-center.sasl.mechanism=PLAIN
event-center.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# 设置event-center-new集群的kafka地址列表
event-center-new.bootstrap.servers = target:29092
event-center-new.security.protocol=SASL_PLAINTEXT
event-center-new.sasl.mechanism=PLAIN
event-center-new.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# 开启event-center集群向event-center-new集群同步
event-center->event-center-new.enabled = true
# 允许同步topic的正则
event-center->event-center-new.topics = projects.*
event-center->event-center-new.groups = .*
# MM2内部同步机制使用的topic,replication数量设置
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1
# 自定义参数
# 是否同步源topic配置
sync.topic.configs.enabled=true
# 是否同步源event-centerCL信息
sync.topic.acls.enabled=true
sync.group.offsets.enabled=true
# 连接器是否发送心跳
emit.heartbeats.enabled=true
# 心跳间隔
emit.heartbeats.interval.seconds=5
# 是否发送检查点
emit.checkpoints.enabled=true
# 是否刷新topic列表
refresh.topics.enabled=true
# 刷新间隔
refresh.topics.interval.seconds=60
# 是否刷新消费者组id
refresh.groups.enabled=true
# 刷新间隔
refresh.groups.interval.seconds=60
# DefaultReplicationPolicy / CustomReplicationPolicy
replication.policy.class=org.apache.kafka.connect.mirror.CustomReplicationPolicy
# 远端创建新topic的replication数量设置
replication.factor=3
需要注意的是:replication.policy.class 默认为:DefaultReplicationPolicy,这个策略会把同步至目标集群的topic都加上一个源集群别名的前缀,比如源集群别名为A,topic为:bi-log,该topic同步到目标集群后会变成:A.bi-log,为啥这么做呢,就是为了避免双向同步的场景出现死循环。
官方也给出了解释:
这是 MirrorMaker 2.0 中的默认行为,以避免在复杂的镜像拓扑中重写数据。 需要在复制流设计和主题管理方面小心自定义此项,以避免数据丢失。 可以通过对“replication.policy.class”使用自定义复制策略类来完成此操作。
针对如何自定义策略及使用方法,见我的另一篇文章:
为了保证脚本后台运行,写一个脚本包装一下:
run-mm2.sh:
#!/bin/bash
exec ./connect-mirror-maker.sh MM2.properties >log/mm2.log 2>&1 &
之后执行脚本即可。
这种模式会麻烦点,需要提供一个kafka,作为worker节点来同步数据,使用的脚本为:connect-standalone.sh
–help看看如何使用:
./connect-standalone.sh --help
[2023-03-09 20:36:33,479] INFO Usage: ConnectStandalone worker.properties connector1.properties [connector2.properties ...] (org.apache.kafka.connect.cli.ConnectStandalone:63)
[root@XXGL-T-TJSYZ-REDIS-03 bin]#
需要两个配置文件,一个是作为worker的kafka集群信息(worker.properties),另一个是同步数据的配置(connector.properties)
worker.properties:
bootstrap.servers=worker:29092
security.protocol=PLAINTEXT
sasl.mechanism=PLAIN
key.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverter
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
connector.properties:
name = MirrorSourceConnector
topics = projects.*
groups = *
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 1
# source
# 这个配置会使同步之后的Topic都加上一个前缀,慎重
source.cluster.alias = old
source.cluster.bootstrap.servers = source:9193
source.cluster.security.protocol=SASL_PLAINTEXT
source.cluster.sasl.mechanism=PLAIN
source.cluster.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# target
target.cluster.alias = new
target.cluster.bootstrap.servers = target:29092
target.cluster.security.protocol=SASL_PLAINTEXT
target.cluster.sasl.mechanism=PLAIN
target.cluster.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="Admin" password="hMOPbmZE";
# 是否同步源topic配置信息
sync.topic.configs.enabled=true
# 是否同步源ACL信息
sync.topic.acls.enabled=true
sync.group.offsets.enabled=true
# 连接器是否发送心跳
emit.heartbeats.enabled=true
# 心跳间隔
emit.heartbeats.interval.seconds=5
# 是否发送检查点
emit.checkpoints.enabled=true
# 是否刷新topic列表
refresh.topics.enabled=true
# 刷新间隔
refresh.topics.interval.seconds=30
# 是否刷新消费者组id
refresh.groups.enabled=true
# 刷新间隔
refresh.groups.interval.seconds=30
# 连接器消费者预读队列大小
# readahead.queue.capacity=500
# 使用自定义策略
replication.policy.class=org.apache.kafka.connect.mirror.CustomReplicationPolicy
replication.factor = 3
执行:
./connect-standalone.sh worker.properties connector.properties
这种方式做一个简单的介绍,我最后采用的是上一种方式,比较简单直接
验证:
消息数量 OK
使用kafka-tool工具连接上两个集群进行比对
Topic数量 OK
./kafka-topics.sh --bootstrap-server source:9193 --command-config command.properties --list > topics-source.txt
./kafka-topics.sh --bootstrap-server sink:29092 --command-config command.properties --list > topics-sink.txt
security.protocol = SASL_PLAINTEXT
sasl.mechanism = PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
新消息是否同步 OK
新Topic是否同步 OK
Consumer是否同步 NO
./kafka-consumer-groups.sh --bootstrap-server source:9193 --command-config command.properties --list > consumer-source.txt
如果需要同步consumer,需要使用官方提供的工具:RemoteClusterUtils
consumer offset是否同步 NO
ACL是否同步 OK
通过kafka-acls.sh或者客户端工具kafka-tool可以查看
property | default value | description |
---|---|---|
name | required | name of the connector, e.g. “us-west->us-east” |
topics | empty string | regex of topics to replicate, e.g. “topic1|topic2|topic3”. Comma-separated lists are also supported. |
topics.blacklist | “..internal, ..replica, __consumer_offsets” or similar | topics to exclude from replication |
groups | empty string | regex of groups to replicate, e.g. “.*” |
groups.blacklist | empty string | groups to exclude from replication |
source.cluster.alias | required | name of the cluster being replicated |
target.cluster.alias | required | name of the downstream Kafka cluster |
source.cluster.bootstrap.servers | required | upstream cluster to replicate |
target.cluster.bootstrap.servers | required | downstream cluster |
sync.topic.configs.enabled | true | whether or not to monitor source cluster for configuration changes |
sync.topic.acls.enabled | true | whether to monitor source cluster ACLs for changes |
emit.heartbeats.enabled | true | connector should periodically emit heartbeats |
emit.heartbeats.interval.seconds | 5 (seconds) | frequency of heartbeats |
emit.checkpoints.enabled | true | connector should periodically emit consumer offset information |
emit.checkpoints.interval.seconds | 5 (seconds) | frequency of checkpoints |
refresh.topics.enabled | true | connector should periodically check for new topics |
refresh.topics.interval.seconds | 5 (seconds) | frequency to check source cluster for new topics |
refresh.groups.enabled | true | connector should periodically check for new consumer groups |
refresh.groups.interval.seconds | 5 (seconds) | frequency to check source cluster for new consumer groups |
readahead.queue.capacity | 500 (records) | number of records to let consumer get ahead of producer |
replication.policy.class | org.apache.kafka.connect.mirror.DefaultReplicationPolicy | use LegacyReplicationPolicy to mimic legacy MirrorMaker |
heartbeats.topic.retention.ms | 1 day | used when creating heartbeat topics for the first time |
checkpoints.topic.retention.ms | 1 day | used when creating checkpoint topics for the first time |
offset.syncs.topic.retention.ms | max long | used when creating offset sync topic for the first time |
replication.factor | 2 | used when creating remote topics |
参考:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%253A+MirrorMaker+2.0
https://www.reddit.com/r/apachekafka/comments/q5s3al/mirrormaker2_is_not_able_to_replicate_groups_in/?sort=new
https://dev.to/keigodasu/transferring-commit-offset-with-mirrormaker-2-3kbf
https://learn.microsoft.com/zh-cn/azure/hdinsight/kafka/kafka-mirrormaker-2-0-guide
后续验证发现一个问题:
从旧集群生产消息,会复制3份到新集群