基于sharding-scaling sharding-proxy 全量&增量数据迁移扩容

袁安志
2023-12-01

环境规划

服务ip:端口
MySQL10.0.0.99:3306
sharding-scaling10.0.0.88:8888
sharding-proxy10.0.0.88:3307

数据迁移规则说明

1、源数据脚本导入

create database `order`;
 
CREATE TABLE `order`.`t_order` (
  `order_id` bigint(20) NOT NULL,
  `user_id` int(11) NOT NULL,
  `status` varchar(50) DEFAULT NULL,
  `order_no` varchar(64) DEFAULT NULL,
  PRIMARY KEY (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

INSERT INTO `order`.`t_order` (`order_id`, `user_id`, `status`, `order_no`) VALUES ('1', '2', '1', '1111111');
INSERT INTO `order`.`t_order` (`order_id`, `user_id`, `status`, `order_no`) VALUES ('2', '2', '1', '2222222');
INSERT INTO `order`.`t_order` (`order_id`, `user_id`, `status`, `order_no`) VALUES ('3', '2', '1', '3333333');
INSERT INTO `order`.`t_order` (`order_id`, `user_id`, `status`, `order_no`) VALUES ('4', '3', '1', '4444444');
INSERT INTO `order`.`t_order` (`order_id`, `user_id`, `status`, `order_no`) VALUES ('5', '3', '1', '5555555');

2、扩容库脚本
扩容出2个库,2个分表,对应如下脚本

create database `order_0`;
create database `order_1`;


CREATE TABLE `order_0`.`t_order_0` (
  `order_id` bigint(20) NOT NULL,
  `user_id` int(11) NOT NULL,
  `status` varchar(50) DEFAULT NULL,
  `order_no` varchar(64) DEFAULT NULL,
  PRIMARY KEY (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE `order_0`.`t_order_1` (
  `order_id` bigint(20) NOT NULL,
  `user_id` int(11) NOT NULL,
  `status` varchar(50) DEFAULT NULL,
  `order_no` varchar(64) DEFAULT NULL,
  PRIMARY KEY (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;



 
CREATE TABLE `order_1`.`t_order_0` (
  `order_id` bigint(20) NOT NULL,
  `user_id` int(11) NOT NULL,
  `status` varchar(50) DEFAULT NULL,
  `order_no` varchar(64) DEFAULT NULL,
  PRIMARY KEY (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE `order_1`.`t_order_1` (
  `order_id` bigint(20) NOT NULL,
  `user_id` int(11) NOT NULL,
  `status` varchar(50) DEFAULT NULL,
  `order_no` varchar(64) DEFAULT NULL,
  PRIMARY KEY (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

3、账号权限创建
sharding-scaling的增量同步基于binlog实现的, 所以账号需要拥有slave权限,RDS可以忽视此步骤, RDS默认创建的账号拥有slave权限

CREATE USER sharding_slave IDENTIFIED BY '123456';  
GRANT SELECT,INSERT,UPDATE,DELETE, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'sharding_slave'@'%'    ;

FLUSH PRIVILEGES;

4、扩容规则
根据user_id % 2 取模路由到库实例, 根据order_id%2 取模路由到具体表



部署sharding-scaling

需要jdk8环境依赖
该服务的作用是根据指定源数据和目标数据源进行全量同步,且拉取源数据binlog日志进行增量更新

#下载编译包
wget https://linux-soft-ware.oss-cn-shenzhen.aliyuncs.com/sharding-scaling-4.1.0.tar.gz
tar xf sharding-scaling-4.1.0.tar.gz
cd sharding-scaling-4.1.0
#启动服务
 ./bin/start.sh 
 #检查是否启动成功
 tail -50 logs/stdout.log

部署sharding-proxy

该服务的作用是一个数据库中间件,用户在此服务上编辑好分库分表规则后,sharding-scaling会把源数据写入到sharding-proxy中,由sharding-proxy对数据进行路由写入到对应的库和表
1、下载解压

#下载编译包
wget https://linux-soft-ware.oss-cn-shenzhen.aliyuncs.com/sharding-proxy-4.1.0.tar.gz
tar xf sharding-proxy-4.1.0.tar.gz 
cd sharding-proxy-4.1.0

2、编辑账号配置

#在server.yaml最后一行追加以下内容
vim conf/server.yaml 
authentication:
  users:
    root:
      password: 123456

3、编辑分库分表配置

#在config-sharding.yaml最后一行追加以下内容
vim conf/config-sharding.yaml

#对外暴露的数据库名称
schemaName: order


dataSources:
  ds_0:
    url: jdbc:mysql://10.0.0.99:3306/order_0?serverTimezone=UTC&useSSL=false
    username: root
    password: 123456
    connectionTimeoutMilliseconds: 30000
    idleTimeoutMilliseconds: 60000
    maxLifetimeMilliseconds: 1800000
    maxPoolSize: 300
  ds_1:
    url: jdbc:mysql://10.0.0.99:3306/order_1?serverTimezone=UTC&useSSL=false
    username: root
    password: 123456
    connectionTimeoutMilliseconds: 30000
    idleTimeoutMilliseconds: 60000
    maxLifetimeMilliseconds: 1800000
    maxPoolSize: 300
    

shardingRule:
  tables:
    t_order:
      actualDataNodes: ds_${0..1}.t_order_$->{0..1}
      databaseStrategy:
        inline:
          shardingColumn: user_id
          algorithmExpression: ds_${user_id % 2}
      tableStrategy:
        inline:
          shardingColumn: order_id
          algorithmExpression: t_order_${order_id % 2}
      keyGenerator:
        type: SNOWFLAKE
        column: order_id

4、启动

./bin/start.sh 
#检查是否启动成功
tail -10 logs/stdout.log 

5、校验配置正确性

#使用mysql客户端检查是否能链接成功
mysql -h 10.0.0.88 -p 3307 -uroot -p123456

#检查数据库名称是否正确
mysql> show databases;
+--------------+
| Database     |
+--------------+
| order |
+--------------+


#执行以下脚本,写入成功后, 检查order_0 order_1的分表数据是否按照规则落库
mysql> use order;
mysql> INSERT INTO `t_order` (`order_id`, `user_id`, `status`, `order_no`) VALUES (1, 2, '1', '1111111');
mysql> INSERT INTO `t_order` (`order_id`, `user_id`, `status`, `order_no`) VALUES (2, 2, '1', '2222222');
mysql> INSERT INTO `t_order` (`order_id`, `user_id`, `status`, `order_no`) VALUES (3, 2, '1', '3333333');
mysql> INSERT INTO `t_order` (`order_id`, `user_id`, `status`, `order_no`) VALUES (4, 3, '1', '4444444');
mysql> INSERT INTO `t_order` (`order_id`, `user_id`, `status`, `order_no`) VALUES (5, 3, '1', '5555555');
mysql> INSERT INTO `t_order` (`order_id`, `user_id`, `status`, `order_no`) VALUES (6, 3, '1', '6666666');


#检查无误后, 删除测试数据
mysql> delete from t_order

迁移数据

1、新建一个迁移数据任务
重要入参说明:

参数名注释
ruleConfiguration.sourceDatasource源数据相关配置 (老数据源配置)
ruleConfiguration.sourceRule需要迁移的表规则相关配置
ruleConfiguration.sourceRule.tables.t_order写入到sharding-proxy的逻辑表名称
ruleConfiguration.sourceRule.actualDataNodes.actualDataNodes需要迁移的数据表,在下面案例迁移的是单表, 如果迁移的表示分表的话,可以使用这样的表达式ds_0.t_order_$->{1…64} 程序会迁移t_order_1到.t_order_64的数据到新数据源中
ruleConfiguration.destinationDataSourcessharding-proxy的连接配置信息,无注意事项
jobConfiguration.concurrency迁移并发度,举例:如果设置为3,则待迁移的表将会有三个线程同时对该表进行迁移,前提是该表有整数型主键
curl -X POST --url http://localhost:8888/shardingscaling/job/start \
--header 'content-type: application/json' \
--data '{
   "ruleConfiguration": {
      "sourceDatasource": "ds_0: !!org.apache.shardingsphere.orchestration.core.configuration.YamlDataSourceConfiguration\n  dataSourceClassName: com.zaxxer.hikari.HikariDataSource\n  properties:\n    jdbcUrl: jdbc:mysql://10.0.0.99:3306/order?serverTimezone=UTC&useSSL=false&zeroDateTimeBehavior=convertToNull\n    driverClassName: com.mysql.jdbc.Driver\n    username: sharding_slave\n    password: 123456\n    connectionTimeout: 30000\n    idleTimeout: 60000\n    maxLifetime: 1800000\n    maxPoolSize: 100\n    minPoolSize: 10\n    maintenanceIntervalMilliseconds: 30000\n    readOnly: false\n",
      "sourceRule": "tables:\n  t_order:\n    actualDataNodes: ds_0.t_order\n    keyGenerator:\n      column: order_id\n      type: SNOWFLAKE",
      "destinationDataSources": {
         "name": "dt_1",
         "password": "123456",
         "url": "jdbc:mysql://10.0.0.88:3307/order?serverTimezone=UTC&useSSL=false",
         "username": "root"
      }
   },
   "jobConfiguration": {
      "concurrency": 1
   }
}'

#返回的消息如下就代表成功
{"success":true,"errorCode":0,"errorMsg":null,"model":null}

2、查看任务详情进度

#后面的1,是你提交任务的顺序,从1开始,如果你后面又提交了一个任务,那应该改为2
curl  http://localhost:8888/shardingscaling/job/progress/1

#返回的消息内容
#关注2个字段  estimatedRows:计划同步的数量   syncedRows: 已同步数量
#我们可以明显看到已经同步成功了, 你也可以到具体的实例中查看,这里就不查了
{
	"success": true,
	"errorCode": 0,
	"errorMsg": null,
	"model": {
		"id": 1,
		"jobName": "Local Sharding Scaling Job",
		"status": "RUNNING",
		"syncTaskProgress": [{
			"id": "10.0.0.99-3306-order",
			"status": "SYNCHRONIZE_REALTIME_DATA",
			"historySyncTaskProgress": [{
				"id": "history-order-t_order#0",
				"estimatedRows": 5,
				"syncedRows": 5
			}],
			"realTimeSyncTaskProgress": {
				"id": "realtime-order",
				"delayMillisecond": 1549,
				"logPosition": {
					"filename": "mysql-bin.000040",
					"position": 12903,
					"serverId": 0
				}
			}
		}]
	}
}

3、校验增量数据是否同步到扩容库

3.1、在老库模拟一条增量数据

INSERT INTO `order`.`t_order` (`order_id`, `user_id`, `status`, `order_no`) VALUES ('6', '3', '1', '6666666');

3.2、在sharding-proxy查看增量数据是否进来

[root@mysql ~]# mysql -h 10.0.0.88 -P 3307 -uroot -p123456
mysql> use order;
mysql> select * from t_order;
+----------+---------+--------+----------+
| order_id | user_id | status | order_no |
+----------+---------+--------+----------+
|        2 |       2 | 1      | 2222222  |
|        1 |       2 | 1      | 1111111  |
|        3 |       2 | 1      | 3333333  |
|        4 |       3 | 1      | 4444444  |
|        6 |       3 | 1      | 6666666  |
|        5 |       3 | 1      | 5555555  |
+----------+---------+--------+----------+
6 rows in set (0.02 sec)

增量数据的确进来了, 到此我们完成了不停服,全量&增量数据平滑迁移到扩容库


需要注意的坑

1、取模的字段类型不允许设置 unsigned 无符号类型,否则全量同步会失败

2、全量同步过程中,sharding-proxy如果挂了会导致同步失败,sharding-scaling 不会进行重试,所以重启sharding-proxy前需要检查提交的任务是否同步成功

3、如果迁移的数据表很多,建议部署多个sharding-scaling 实例,提高同步的吞吐量
4、sharding-scaling如果挂了或者重启了, 增量数据不会进来,需要重新提交同步任务

 类似资料: