当前位置: 首页 > 工具软件 > Auto Maker > 使用案例 >

【Kafka】记录一次基于connect-mirror-maker做的Kafka集群迁移完整过程

乜明朗
2023-12-01

背景

一个测试环境的kafka集群,Topic有360+,Partition有2000+,部署在虚拟机上,由于多方面原因,要求迁移至k8s容器内(全量迁移),正好可以拿来练一下手。本文主要记录对MM1和MM2的实际操作过程,以及使用过程中遇到的问题及解决方案。

环境

source集群:kafka-2.6.0、2个broker、虚拟机

target集群:kafka-2.6.0、3个broker、k8s

工具:MM1(kafka-mirror-maker.sh)、MM2(connect-mirror-maker.sh)

需求:Topic名称不能改变、数据完整

条件:target集群需要开启自动创建Topic:auto.create.topics.enable=true

工具选型

本质上MM1是Kafka的消费者和生产者结合体,可以有效地将数据从源群集移动到目标群集,但没有提供太多其他功能。

并且在MM1多年的使用过程中发现了以下局限性:

  1. 静态的黑名单和白名单
  2. Topic信息不能同步,所有Topic同步到目标端都只有一个Partition
  3. 必须通过手动配置来解决active-active场景下的循环同步问题(MM2为解决这个问题,也做了体验很不好的改动)
  4. rebalance导致的性能问题
  5. 缺乏监控手段
  6. 无法保证Exactly Once
  7. 无法提供容灾恢复
  8. 无法同步Topic列表,只能同步有数据的Topic

MM2是基于kafka connect框架开发的。与其它的kafka connecet一样MM2有source connector和sink connetor组成,可以支持同步以下数据:

  1. 完整的Topic列表
  2. Topic配置
  3. ACL信息(如果有)
  4. consumer group和offset(kafka2.7.0之后版本才行)
  5. 其他功能:
    • 支持循环同步检测
    • 多集群自定义同步(同一个任务中,可以多集群同步:A->B、B->C、B->D)
    • 提供可监控Metrics
    • 可通过配置保证Exactly Once

实操

秉着实操前先演练的原则,我自己搭建了一个和目标集群相同配置的集群,用于验证不同工具的操作结果。有足够把握之后,再对目标集群实际操作。

MM1

执行 --help 查看参数选项:

[root@XXGL-T-TJSYZ-REDIS-03 bin]# ./kafka-mirror-maker.sh --help
This tool helps to continuously copy data between two Kafka clusters.
Option                                   Description
------                                   -----------
--abort.on.send.failure <String: Stop    Configure the mirror maker to exit on
  the entire mirror maker when a send      a failed send. (default: true)
  failure occurs>
--consumer.config <String: config file>  Embedded consumer config for consuming
                                           from the source cluster.
--consumer.rebalance.listener <String:   The consumer rebalance listener to use
  A custom rebalance listener of type      for mirror maker consumer.
  ConsumerRebalanceListener>
--help                                   Print usage information.
--message.handler <String: A custom      Message handler which will process
  message handler of type                  every record in-between consumer and
  MirrorMakerMessageHandler>               producer.
--message.handler.args <String:          Arguments used by custom message
  Arguments passed to message handler      handler for mirror maker.
  constructor.>
--new.consumer                           DEPRECATED Use new consumer in mirror
                                           maker (this is the default so this
                                           option will be removed in a future
                                           version).
--num.streams <Integer: Number of        Number of consumption streams.
  threads>                                 (default: 1)
--offset.commit.interval.ms <Integer:    Offset commit interval in ms.
  offset commit interval in                (default: 60000)
  millisecond>
--producer.config <String: config file>  Embedded producer config.
--rebalance.listener.args <String:       Arguments used by custom rebalance
  Arguments passed to custom rebalance     listener for mirror maker consumer.
  listener constructor as a string.>
--version                                Display Kafka version.
--whitelist <String: Java regex          Whitelist of topics to mirror.
  (String)>
[root@XXGL-T-TJSYZ-REDIS-03 bin]#         

核心参数就两个:消费者和生产者的配置文件:

consumer.properties:(消费source集群)

bootstrap.servers=source:9092
auto.offset.reset=earliest
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
group.id=mm1-consumer
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";

producer.properties:(发送消息至目标集群)

bootstrap.servers= target:29092
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="Admin" password="hMOPbmZE";
acks=-1
linger.ms=10
batch.size=10000
retries=3

执行脚本:

./kafka-mirror-maker.sh --consumer.config ./consumer.properties --producer.config ./producer.properties --offset.commit.interval.ms 5000 --num.streams 2 --whitelist "projects.*"

MM1比较简单,只要两个配置文件没问题,sasl配置正确,基本就OK了,适合简单的数据同步,比如指定topic进行同步。

MM2

有四种运行MM2的方法:

  • As a dedicated MirrorMaker cluster.(作为专用的MirrorMaker群集)
  • As a Connector in a distributed Connect cluster.(作为分布式Connect群集中的连接器)
  • As a standalone Connect worker.(作为独立的Connect工作者)
  • In legacy mode using existing MirrorMaker scripts.(在旧模式下,使用现有的MirrorMaker脚本。)

本文介绍第一种和第三种:作为专用的MirrorMaker群集、作为独立的Connect工作者,第二种需要搭建connect集群,操作比较复杂。

以MM2集群运行

这种模式是最简单的,只需要提供一个配置文件即可,配置文件定制化程度比较高,根据业务需求配置即可

老样子,执行 --help 看看使用说明:

[root@XXGL-T-TJSYZ-REDIS-03 bin]# ./connect-mirror-maker.sh --help
usage: connect-mirror-maker [-h] [--clusters CLUSTER [CLUSTER ...]] mm2.properties

MirrorMaker 2.0 driver

positional arguments:
  mm2.properties         MM2 configuration file.

optional arguments:
  -h, --help             show this help message and exit
  --clusters CLUSTER [CLUSTER ...]
                         Target cluster to use for this node.
[root@XXGL-T-TJSYZ-REDIS-03 bin]#  

可以看到,参数简单了许多,核心参数就一个配置文件。

mm2.properties:

name = event-center-connector
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 2

# 定义集群别名
clusters = event-center, event-center-new

# 设置event-center集群的kafka地址列表
event-center.bootstrap.servers = source:9193
event-center.security.protocol=SASL_PLAINTEXT
event-center.sasl.mechanism=PLAIN
event-center.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# 设置event-center-new集群的kafka地址列表
event-center-new.bootstrap.servers = target:29092
event-center-new.security.protocol=SASL_PLAINTEXT
event-center-new.sasl.mechanism=PLAIN
event-center-new.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# 开启event-center集群向event-center-new集群同步
event-center->event-center-new.enabled = true
# 允许同步topic的正则
event-center->event-center-new.topics = projects.*
event-center->event-center-new.groups = .*

# MM2内部同步机制使用的topic,replication数量设置
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1

# 自定义参数
# 是否同步源topic配置
sync.topic.configs.enabled=true
# 是否同步源event-centerCL信息
sync.topic.acls.enabled=true
sync.group.offsets.enabled=true
# 连接器是否发送心跳
emit.heartbeats.enabled=true
# 心跳间隔
emit.heartbeats.interval.seconds=5
# 是否发送检查点
emit.checkpoints.enabled=true
# 是否刷新topic列表
refresh.topics.enabled=true
# 刷新间隔
refresh.topics.interval.seconds=60
# 是否刷新消费者组id
refresh.groups.enabled=true
# 刷新间隔
refresh.groups.interval.seconds=60
# DefaultReplicationPolicy / CustomReplicationPolicy
replication.policy.class=org.apache.kafka.connect.mirror.CustomReplicationPolicy
# 远端创建新topic的replication数量设置
replication.factor=3

需要注意的是:replication.policy.class 默认为:DefaultReplicationPolicy,这个策略会把同步至目标集群的topic都加上一个源集群别名的前缀,比如源集群别名为A,topic为:bi-log,该topic同步到目标集群后会变成:A.bi-log,为啥这么做呢,就是为了避免双向同步的场景出现死循环。

官方也给出了解释:

这是 MirrorMaker 2.0 中的默认行为,以避免在复杂的镜像拓扑中重写数据。 需要在复制流设计和主题管理方面小心自定义此项,以避免数据丢失。 可以通过对“replication.policy.class”使用自定义复制策略类来完成此操作。

针对如何自定义策略及使用方法,见我的另一篇文章:

为了保证脚本后台运行,写一个脚本包装一下:

run-mm2.sh:

#!/bin/bash

exec ./connect-mirror-maker.sh MM2.properties >log/mm2.log 2>&1 &

之后执行脚本即可。

以Standalone模式运行

这种模式会麻烦点,需要提供一个kafka,作为worker节点来同步数据,使用的脚本为:connect-standalone.sh

–help看看如何使用:

./connect-standalone.sh --help
[2023-03-09 20:36:33,479] INFO Usage: ConnectStandalone worker.properties connector1.properties [connector2.properties ...] (org.apache.kafka.connect.cli.ConnectStandalone:63)
[root@XXGL-T-TJSYZ-REDIS-03 bin]# 

需要两个配置文件,一个是作为worker的kafka集群信息(worker.properties),另一个是同步数据的配置(connector.properties)

worker.properties:

bootstrap.servers=worker:29092
security.protocol=PLAINTEXT
sasl.mechanism=PLAIN

key.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverter

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

connector.properties:

name = MirrorSourceConnector
topics = projects.*
groups = *
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 1

# source
# 这个配置会使同步之后的Topic都加上一个前缀,慎重
source.cluster.alias = old
source.cluster.bootstrap.servers = source:9193
source.cluster.security.protocol=SASL_PLAINTEXT
source.cluster.sasl.mechanism=PLAIN
source.cluster.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# target
target.cluster.alias = new
target.cluster.bootstrap.servers = target:29092
target.cluster.security.protocol=SASL_PLAINTEXT
target.cluster.sasl.mechanism=PLAIN
target.cluster.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="Admin" password="hMOPbmZE";

# 是否同步源topic配置信息
sync.topic.configs.enabled=true
# 是否同步源ACL信息
sync.topic.acls.enabled=true
sync.group.offsets.enabled=true
# 连接器是否发送心跳
emit.heartbeats.enabled=true
# 心跳间隔
emit.heartbeats.interval.seconds=5
# 是否发送检查点
emit.checkpoints.enabled=true
# 是否刷新topic列表
refresh.topics.enabled=true
# 刷新间隔
refresh.topics.interval.seconds=30
# 是否刷新消费者组id
refresh.groups.enabled=true
# 刷新间隔
refresh.groups.interval.seconds=30
# 连接器消费者预读队列大小
# readahead.queue.capacity=500
# 使用自定义策略
replication.policy.class=org.apache.kafka.connect.mirror.CustomReplicationPolicy
replication.factor = 3

执行:

./connect-standalone.sh worker.properties connector.properties

这种方式做一个简单的介绍,我最后采用的是上一种方式,比较简单直接

验证

验证:

  • 消息数量 OK

    使用kafka-tool工具连接上两个集群进行比对

  • Topic数量 OK

    • source:
    ./kafka-topics.sh --bootstrap-server source:9193 --command-config command.properties --list > topics-source.txt 
    
    • sink
    ./kafka-topics.sh --bootstrap-server sink:29092 --command-config command.properties --list > topics-sink.txt 
    
    • command.properties示例:
    security.protocol = SASL_PLAINTEXT
    sasl.mechanism = PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
    
  • 新消息是否同步 OK

  • 新Topic是否同步 OK

  • Consumer是否同步 NO

./kafka-consumer-groups.sh --bootstrap-server source:9193 --command-config command.properties --list > consumer-source.txt 

​ 如果需要同步consumer,需要使用官方提供的工具:RemoteClusterUtils

  • consumer offset是否同步 NO

  • ACL是否同步 OK
    通过kafka-acls.sh或者客户端工具kafka-tool可以查看

附录

MM2配置表

propertydefault valuedescription
namerequiredname of the connector, e.g. “us-west->us-east”
topicsempty stringregex of topics to replicate, e.g. “topic1|topic2|topic3”. Comma-separated lists are also supported.
topics.blacklist“..internal, ..replica, __consumer_offsets” or similartopics to exclude from replication
groupsempty stringregex of groups to replicate, e.g. “.*”
groups.blacklistempty stringgroups to exclude from replication
source.cluster.aliasrequiredname of the cluster being replicated
target.cluster.aliasrequiredname of the downstream Kafka cluster
source.cluster.bootstrap.serversrequiredupstream cluster to replicate
target.cluster.bootstrap.serversrequireddownstream cluster
sync.topic.configs.enabledtruewhether or not to monitor source cluster for configuration changes
sync.topic.acls.enabledtruewhether to monitor source cluster ACLs for changes
emit.heartbeats.enabledtrueconnector should periodically emit heartbeats
emit.heartbeats.interval.seconds5 (seconds)frequency of heartbeats
emit.checkpoints.enabledtrueconnector should periodically emit consumer offset information
emit.checkpoints.interval.seconds5 (seconds)frequency of checkpoints
refresh.topics.enabledtrueconnector should periodically check for new topics
refresh.topics.interval.seconds5 (seconds)frequency to check source cluster for new topics
refresh.groups.enabledtrueconnector should periodically check for new consumer groups
refresh.groups.interval.seconds5 (seconds)frequency to check source cluster for new consumer groups
readahead.queue.capacity500 (records)number of records to let consumer get ahead of producer
replication.policy.classorg.apache.kafka.connect.mirror.DefaultReplicationPolicyuse LegacyReplicationPolicy to mimic legacy MirrorMaker
heartbeats.topic.retention.ms1 dayused when creating heartbeat topics for the first time
checkpoints.topic.retention.ms1 dayused when creating checkpoint topics for the first time
offset.syncs.topic.retention.msmax longused when creating offset sync topic for the first time
replication.factor2used when creating remote topics

其他

参考:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%253A+MirrorMaker+2.0

https://www.reddit.com/r/apachekafka/comments/q5s3al/mirrormaker2_is_not_able_to_replicate_groups_in/?sort=new

https://dev.to/keigodasu/transferring-commit-offset-with-mirrormaker-2-3kbf

https://learn.microsoft.com/zh-cn/azure/hdinsight/kafka/kafka-mirrormaker-2-0-guide

TODO

后续验证发现一个问题:
从旧集群生产消息,会复制3份到新集群

 类似资料: