服务 | ip:端口 |
---|---|
MySQL | 10.0.0.99:3306 |
sharding-scaling | 10.0.0.88:8888 |
sharding-proxy | 10.0.0.88:3307 |
1、源数据脚本导入
create database `order`;
CREATE TABLE `order`.`t_order` (
`order_id` bigint(20) NOT NULL,
`user_id` int(11) NOT NULL,
`status` varchar(50) DEFAULT NULL,
`order_no` varchar(64) DEFAULT NULL,
PRIMARY KEY (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO `order`.`t_order` (`order_id`, `user_id`, `status`, `order_no`) VALUES ('1', '2', '1', '1111111');
INSERT INTO `order`.`t_order` (`order_id`, `user_id`, `status`, `order_no`) VALUES ('2', '2', '1', '2222222');
INSERT INTO `order`.`t_order` (`order_id`, `user_id`, `status`, `order_no`) VALUES ('3', '2', '1', '3333333');
INSERT INTO `order`.`t_order` (`order_id`, `user_id`, `status`, `order_no`) VALUES ('4', '3', '1', '4444444');
INSERT INTO `order`.`t_order` (`order_id`, `user_id`, `status`, `order_no`) VALUES ('5', '3', '1', '5555555');
2、扩容库脚本
扩容出2个库,2个分表,对应如下脚本
create database `order_0`;
create database `order_1`;
CREATE TABLE `order_0`.`t_order_0` (
`order_id` bigint(20) NOT NULL,
`user_id` int(11) NOT NULL,
`status` varchar(50) DEFAULT NULL,
`order_no` varchar(64) DEFAULT NULL,
PRIMARY KEY (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `order_0`.`t_order_1` (
`order_id` bigint(20) NOT NULL,
`user_id` int(11) NOT NULL,
`status` varchar(50) DEFAULT NULL,
`order_no` varchar(64) DEFAULT NULL,
PRIMARY KEY (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `order_1`.`t_order_0` (
`order_id` bigint(20) NOT NULL,
`user_id` int(11) NOT NULL,
`status` varchar(50) DEFAULT NULL,
`order_no` varchar(64) DEFAULT NULL,
PRIMARY KEY (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `order_1`.`t_order_1` (
`order_id` bigint(20) NOT NULL,
`user_id` int(11) NOT NULL,
`status` varchar(50) DEFAULT NULL,
`order_no` varchar(64) DEFAULT NULL,
PRIMARY KEY (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
3、账号权限创建
sharding-scaling的增量同步基于binlog实现的, 所以账号需要拥有slave权限,RDS可以忽视此步骤, RDS默认创建的账号拥有slave权限
CREATE USER sharding_slave IDENTIFIED BY '123456';
GRANT SELECT,INSERT,UPDATE,DELETE, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'sharding_slave'@'%' ;
FLUSH PRIVILEGES;
4、扩容规则
根据user_id % 2 取模路由到库实例, 根据order_id%2 取模路由到具体表
需要jdk8环境依赖
该服务的作用是根据指定源数据和目标数据源进行全量同步,且拉取源数据binlog日志进行增量更新
#下载编译包
wget https://linux-soft-ware.oss-cn-shenzhen.aliyuncs.com/sharding-scaling-4.1.0.tar.gz
tar xf sharding-scaling-4.1.0.tar.gz
cd sharding-scaling-4.1.0
#启动服务
./bin/start.sh
#检查是否启动成功
tail -50 logs/stdout.log
该服务的作用是一个数据库中间件,用户在此服务上编辑好分库分表规则后,sharding-scaling会把源数据写入到sharding-proxy中,由sharding-proxy对数据进行路由写入到对应的库和表
1、下载解压
#下载编译包
wget https://linux-soft-ware.oss-cn-shenzhen.aliyuncs.com/sharding-proxy-4.1.0.tar.gz
tar xf sharding-proxy-4.1.0.tar.gz
cd sharding-proxy-4.1.0
2、编辑账号配置
#在server.yaml最后一行追加以下内容
vim conf/server.yaml
authentication:
users:
root:
password: 123456
3、编辑分库分表配置
#在config-sharding.yaml最后一行追加以下内容
vim conf/config-sharding.yaml
#对外暴露的数据库名称
schemaName: order
dataSources:
ds_0:
url: jdbc:mysql://10.0.0.99:3306/order_0?serverTimezone=UTC&useSSL=false
username: root
password: 123456
connectionTimeoutMilliseconds: 30000
idleTimeoutMilliseconds: 60000
maxLifetimeMilliseconds: 1800000
maxPoolSize: 300
ds_1:
url: jdbc:mysql://10.0.0.99:3306/order_1?serverTimezone=UTC&useSSL=false
username: root
password: 123456
connectionTimeoutMilliseconds: 30000
idleTimeoutMilliseconds: 60000
maxLifetimeMilliseconds: 1800000
maxPoolSize: 300
shardingRule:
tables:
t_order:
actualDataNodes: ds_${0..1}.t_order_$->{0..1}
databaseStrategy:
inline:
shardingColumn: user_id
algorithmExpression: ds_${user_id % 2}
tableStrategy:
inline:
shardingColumn: order_id
algorithmExpression: t_order_${order_id % 2}
keyGenerator:
type: SNOWFLAKE
column: order_id
4、启动
./bin/start.sh
#检查是否启动成功
tail -10 logs/stdout.log
5、校验配置正确性
#使用mysql客户端检查是否能链接成功
mysql -h 10.0.0.88 -p 3307 -uroot -p123456
#检查数据库名称是否正确
mysql> show databases;
+--------------+
| Database |
+--------------+
| order |
+--------------+
#执行以下脚本,写入成功后, 检查order_0 order_1的分表数据是否按照规则落库
mysql> use order;
mysql> INSERT INTO `t_order` (`order_id`, `user_id`, `status`, `order_no`) VALUES (1, 2, '1', '1111111');
mysql> INSERT INTO `t_order` (`order_id`, `user_id`, `status`, `order_no`) VALUES (2, 2, '1', '2222222');
mysql> INSERT INTO `t_order` (`order_id`, `user_id`, `status`, `order_no`) VALUES (3, 2, '1', '3333333');
mysql> INSERT INTO `t_order` (`order_id`, `user_id`, `status`, `order_no`) VALUES (4, 3, '1', '4444444');
mysql> INSERT INTO `t_order` (`order_id`, `user_id`, `status`, `order_no`) VALUES (5, 3, '1', '5555555');
mysql> INSERT INTO `t_order` (`order_id`, `user_id`, `status`, `order_no`) VALUES (6, 3, '1', '6666666');
#检查无误后, 删除测试数据
mysql> delete from t_order
1、新建一个迁移数据任务
重要入参说明:
参数名 | 注释 |
---|---|
ruleConfiguration.sourceDatasource | 源数据相关配置 (老数据源配置) |
ruleConfiguration.sourceRule | 需要迁移的表规则相关配置 |
ruleConfiguration.sourceRule.tables.t_order | 写入到sharding-proxy的逻辑表名称 |
ruleConfiguration.sourceRule.actualDataNodes.actualDataNodes | 需要迁移的数据表,在下面案例迁移的是单表, 如果迁移的表示分表的话,可以使用这样的表达式ds_0.t_order_$->{1…64} 程序会迁移t_order_1到.t_order_64的数据到新数据源中 |
ruleConfiguration.destinationDataSources | sharding-proxy的连接配置信息,无注意事项 |
jobConfiguration.concurrency | 迁移并发度,举例:如果设置为3,则待迁移的表将会有三个线程同时对该表进行迁移,前提是该表有整数型主键 |
curl -X POST --url http://localhost:8888/shardingscaling/job/start \
--header 'content-type: application/json' \
--data '{
"ruleConfiguration": {
"sourceDatasource": "ds_0: !!org.apache.shardingsphere.orchestration.core.configuration.YamlDataSourceConfiguration\n dataSourceClassName: com.zaxxer.hikari.HikariDataSource\n properties:\n jdbcUrl: jdbc:mysql://10.0.0.99:3306/order?serverTimezone=UTC&useSSL=false&zeroDateTimeBehavior=convertToNull\n driverClassName: com.mysql.jdbc.Driver\n username: sharding_slave\n password: 123456\n connectionTimeout: 30000\n idleTimeout: 60000\n maxLifetime: 1800000\n maxPoolSize: 100\n minPoolSize: 10\n maintenanceIntervalMilliseconds: 30000\n readOnly: false\n",
"sourceRule": "tables:\n t_order:\n actualDataNodes: ds_0.t_order\n keyGenerator:\n column: order_id\n type: SNOWFLAKE",
"destinationDataSources": {
"name": "dt_1",
"password": "123456",
"url": "jdbc:mysql://10.0.0.88:3307/order?serverTimezone=UTC&useSSL=false",
"username": "root"
}
},
"jobConfiguration": {
"concurrency": 1
}
}'
#返回的消息如下就代表成功
{"success":true,"errorCode":0,"errorMsg":null,"model":null}
2、查看任务详情进度
#后面的1,是你提交任务的顺序,从1开始,如果你后面又提交了一个任务,那应该改为2
curl http://localhost:8888/shardingscaling/job/progress/1
#返回的消息内容
#关注2个字段 estimatedRows:计划同步的数量 syncedRows: 已同步数量
#我们可以明显看到已经同步成功了, 你也可以到具体的实例中查看,这里就不查了
{
"success": true,
"errorCode": 0,
"errorMsg": null,
"model": {
"id": 1,
"jobName": "Local Sharding Scaling Job",
"status": "RUNNING",
"syncTaskProgress": [{
"id": "10.0.0.99-3306-order",
"status": "SYNCHRONIZE_REALTIME_DATA",
"historySyncTaskProgress": [{
"id": "history-order-t_order#0",
"estimatedRows": 5,
"syncedRows": 5
}],
"realTimeSyncTaskProgress": {
"id": "realtime-order",
"delayMillisecond": 1549,
"logPosition": {
"filename": "mysql-bin.000040",
"position": 12903,
"serverId": 0
}
}
}]
}
}
3、校验增量数据是否同步到扩容库
3.1、在老库模拟一条增量数据
INSERT INTO `order`.`t_order` (`order_id`, `user_id`, `status`, `order_no`) VALUES ('6', '3', '1', '6666666');
3.2、在sharding-proxy查看增量数据是否进来
[root@mysql ~]# mysql -h 10.0.0.88 -P 3307 -uroot -p123456
mysql> use order;
mysql> select * from t_order;
+----------+---------+--------+----------+
| order_id | user_id | status | order_no |
+----------+---------+--------+----------+
| 2 | 2 | 1 | 2222222 |
| 1 | 2 | 1 | 1111111 |
| 3 | 2 | 1 | 3333333 |
| 4 | 3 | 1 | 4444444 |
| 6 | 3 | 1 | 6666666 |
| 5 | 3 | 1 | 5555555 |
+----------+---------+--------+----------+
6 rows in set (0.02 sec)
增量数据的确进来了, 到此我们完成了不停服,全量&增量数据平滑迁移到扩容库
1、取模的字段类型不允许设置 unsigned 无符号类型,否则全量同步会失败
2、全量同步过程中,sharding-proxy如果挂了会导致同步失败,sharding-scaling 不会进行重试,所以重启sharding-proxy前需要检查提交的任务是否同步成功
3、如果迁移的数据表很多,建议部署多个sharding-scaling 实例,提高同步的吞吐量
4、sharding-scaling如果挂了或者重启了, 增量数据不会进来,需要重新提交同步任务