提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
最近项目需要一个数据引接功能,要能实现各数据库之间的数据迁移,数据的全量迁移和增量迁移,并找到开源项目DBSWITCH
https://gitee.com/dk_88/dbswitch
官方:
一句话,dbswitch工具提供源端数据库向目的端数据的迁移同步功能,包括全量和增量方式。迁移包括:
结构迁移
字段类型、主键信息、建表语句等的转换,并生成建表SQL语句。
数据迁移。
基于JDBC的分批次读取源端数据库数据,并基于insert/copy方式将数据分批次写入目的数据库。
支持有主键表的 增量变更同步 (变化数据计算Change Data Calculate)功能(千万级以上数据量慎用)
DBSWITCH数据迁移是两种方式,一种全量,一种增量。并且默认执行树迁移时先删除表,在创建表。
同时,如果你修改了目标数据库的数据。会在数据增量时,把目标数据库修改和新增的数据覆盖。也就是说目标数据库不允许有和源数据库不一样的数据。
修改底层代码实现
找到手动执行任务接口
@TokenCheck
@LogOperate(name = "手动执行任务", description = "'手动执行任务的ID为:'+#ids")
@ApiOperation(value = "手动执行")
@PostMapping(value = "/run", produces = MediaType.APPLICATION_JSON_VALUE)
public Result runAssignments(@RequestBody List<Long> ids) {
assignmentService.runAssignments(ids);
return Result.success();
}
在这个接口的service中,数据库连接等信息被封装到jobDetail被异步扔进scheduler.scheduleJob(jobDetail, simpleTrigger)任务中。这个任务一种有一个任务执行器来处理这个job,找到JobExecutorService
@Override
public void executeInternal(JobExecutionContext context) throws JobExecutionException {
currentThread = Thread.currentThread();
JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
if (interrupted) {
log.info("Quartz task id:{} interrupted", jobDataMap.getLong(TASK_ID));
return;
}
JobKey key = context.getJobDetail().getKey();
Long taskId = jobDataMap.getLongValue(TASK_ID);
Integer schedule = jobDataMap.getIntValue(SCHEDULE);
AssignmentJobEntity assignmentJobEntity = assignmentJobDAO
.newAssignmentJob(taskId, schedule, key.getName());
try {
ReentrantLock lock = mutexes.get(taskId.toString(), ReentrantLock::new);
while (!lock.tryLock(1, TimeUnit.SECONDS)) {
TimeUnit.SECONDS.sleep(1);
}
try {
log.info("Execute Quartz Job, and task id is : {} , job id is: {}", taskId,
assignmentJobEntity.getId());
AssignmentTaskEntity task = assignmentTaskDAO.getById(taskId);
AssignmentConfigEntity assignmentConfigEntity = assignmentConfigDAO
.getByAssignmentTaskId(task.getId());
log.info("Execute Assignment [taskId={}],Task Name: {} ,configuration properties:{}",
task.getId(),
task.getName(),
task.getContent());
try {
DbswichProperties properties = JsonUtils.toBeanObject(
task.getContent(), DbswichProperties.class);
if (!assignmentConfigEntity.getFirstFlag()) {
properties.getTarget().setTargetDrop(false);
properties.getTarget().setChangeDataSync(true);
}
MigrationService mainService = new MigrationService(properties);
if (interrupted) {
log.info("Quartz task id:{} interrupted", jobDataMap.getLong(TASK_ID));
return;
}
// 实际执行JOB
mainService.run();
if (assignmentConfigEntity.getFirstFlag()) {
AssignmentConfigEntity config = new AssignmentConfigEntity();
config.setId(assignmentConfigEntity.getId());
config.setTargetDropTable(Boolean.FALSE);
config.setFirstFlag(Boolean.FALSE);
assignmentConfigDAO.updateSelective(config);
}
assignmentJobEntity.setStatus(JobStatusEnum.PASS.getValue());
log.info("Execute Assignment Success [taskId={},jobId={}],Task Name: {}",
task.getId(), assignmentJobEntity.getId(), task.getName());
} catch (Throwable e) {
assignmentJobEntity.setStatus(JobStatusEnum.FAIL.getValue());
assignmentJobEntity.setErrorLog(ExceptionUtil.stacktraceToString(e));
log.info("Execute Assignment Failed [taskId={},jobId={}],Task Name: {}",
task.getId(), assignmentJobEntity.getId(), task.getName(), e);
} finally {
assignmentJobEntity.setFinishTime(new Timestamp(System.currentTimeMillis()));
assignmentJobDAO.updateSelective(assignmentJobEntity);
}
} finally {
lock.unlock();
}
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
这个方法前半部分是取值,后半部分是日志记录,只有 mainService.run(); 是执行job,我们继续往下看。
在MigrationService 这个数据迁移这个主逻辑类中,主要是判断任务参数中的排除表和包含表的情况,来确定数据迁移是哪几张表。判断最后,将需要迁移的数据库表添加到一个异步执行任务的队列中
for (TableDescription td : tableList) {
// 当没有配置迁移的表是,默认为所有物理表(不含有视图表)
if (includes.isEmpty() && DBTableType.VIEW.name().equals(td.getTableType())) {
continue;
}
String tableName = td.getTableName();
if (useExcludeTables) { //使用排除表
if (!filters.contains(tableName)) {
futures.add(
makeFutureTask(td, indexInternal, sourceDataSource, targetDataSource,
numberOfFailures, totalBytesSize));
}
} else { //使用包含表
if (includes.size() == 1 && (includes.get(0).contains("*") || includes.get(0)
.contains("?"))) {
if (Pattern.matches(includes.get(0), tableName)) {
futures.add(
makeFutureTask(td, indexInternal, sourceDataSource, targetDataSource,
numberOfFailures, totalBytesSize));
}
} else if (includes.contains(tableName)) {
futures.add(makeFutureTask(td, indexInternal, sourceDataSource, targetDataSource,
numberOfFailures, totalBytesSize));
}
}
}
makeFutureTask方法是构建一个异步任务,这个方法中还有一个最终要的方法getMigrateHandler 单表迁移方法
CompletableFuture.supplyAsync 异步执行 getMigrateHandler 单表迁移这个方法
/**
* 构造一个异步执行任务
*
* @param td 表描述上下文
* @param indexInternal 源端索引号
* @param sds 源端的DataSource数据源
* @param tds 目的端的DataSource数据源
* @param numberOfFailures 失败的数量
* @param totalBytesSize 同步的字节大小
* @return CompletableFuture<Void>
*/
private CompletableFuture<Void> makeFutureTask(TableDescription td,Integer indexInternal,
HikariDataSource sds, HikariDataSource tds,
AtomicInteger numberOfFailures,
AtomicLong totalBytesSize) {
return CompletableFuture.supplyAsync(getMigrateHandler(td, indexInternal, sds, tds))
.exceptionally(getExceptHandler(td, numberOfFailures))
.thenAccept(totalBytesSize::addAndGet);
}
/**
* 单表迁移处理方法
*
* @param td 表描述上下文
* @param indexInternal 源端索引号
* @param sds 源端的DataSource数据源
* @param tds 目的端的DataSource数据源
* @return Supplier<Long>
*/
private Supplier<Long> getMigrateHandler(TableDescription td,
Integer indexInternal,
HikariDataSource sds,
HikariDataSource tds) {
return () -> MigrationHandler.createInstance(td, properties, indexInternal, sds, tds).get();
}
MigrationHandler中的get方法就是最终数据迁移的实现
if (properties.getTarget().getTargetDrop()) {
log.error("targetDrop " + properties.getTarget().getChangeDataSync());
/*
如果配置了dbswitch.target.datasource-target-drop=true时,
<p>
先执行drop table语句,然后执行create table语句
*/
我们看properties.getTarget().getTargetDrop())为true时,会删除目表原始数据库,在执行createTable语句,我们需要目标数据源有自己独有的数据而不被覆盖,那么这个结果一定要为false。查找到这个类对象将targetDrop字段设置为false,DbswichProperties来自config.yml,建议也将字段设置为false
继续往下看,会看到两个方法:
doFullCoverSynchronize(writer) — 全量覆盖
doIncreaseSynchronize(writer) ---- 增量
我以为到这儿,我只需要将targetDrop设置为false就完成了,但是,实验之后,依然没有解决。
继续查看增量的方法,查看日志后,发现代码执行了 doDelete() 和 doUpdate doInsert 三个方法,并且输出了三个方法执行的次数,查看代码:
@Override
public void destroy(List<String> fields) {
//TODO 取消删除
if (cacheDelete.size() > 0) {
doDelete(fields);
}
if (cacheInsert.size() > 0) {
doInsert(fields);
}
//TODO 取消更新
if (cacheUpdate.size() > 0) {
doUpdate(fields);
}
log.info("[IncreaseSync] Handle table [{}] total count: {}, Insert:{},Update:{},Delete:{} ",
tableNameMapString, countTotal, countInsert, countUpdate, countDelete);
}
原理:在底层设计中,源数据库和目标数据库数据会进行对比,并且标名这个数据状态(修改,新增,未变,删除)。并且将这新增放入cacheInsert,更新放入cacheUpdate,删除放入cacheDelete中。然后首先执行删除,将目标源数据进行删除,在执行新增,最后更新。
所以要想目标源保留独有数据,需要将doDelete和cacheUpdate方法取消,将对应的cache缓存清除以免内存溢出.
@Override
public void destroy(List<String> fields) {
//TODO 取消删除
/* if (cacheDelete.size() > 0) {
doDelete(fields);
}*/
if (cacheInsert.size() > 0) {
doInsert(fields);
}
//TODO 取消更新
/* if (cacheUpdate.size() > 0) {
doUpdate(fields);
}*/
//TODO 缓存删除
cacheUpdate.clear();
cacheDelete.clear();
log.info("[IncreaseSync] Handle table [{}] total count: {}, Insert:{},Update:{},Delete:{} ",
tableNameMapString, countTotal, countInsert, countUpdate, countDelete);
}