当前位置: 首页 > 工具软件 > dbswitch > 使用案例 >

dbswitch数据迁移数据增量时如何不覆盖目标源数据

卫和洽
2023-12-01

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档


前言

最近项目需要一个数据引接功能,要能实现各数据库之间的数据迁移,数据的全量迁移和增量迁移,并找到开源项目DBSWITCH


一、DBSWITCH是什么?

https://gitee.com/dk_88/dbswitch

官方:

一句话,dbswitch工具提供源端数据库向目的端数据的迁移同步功能,包括全量和增量方式。迁移包括:

结构迁移
字段类型、主键信息、建表语句等的转换,并生成建表SQL语句。

数据迁移。
基于JDBC的分批次读取源端数据库数据,并基于insert/copy方式将数据分批次写入目的数据库。

支持有主键表的 增量变更同步 (变化数据计算Change Data Calculate)功能(千万级以上数据量慎用)

二、使用步骤

1.拉取代码

2.读取代码

3.目标数据库数据覆盖问题

DBSWITCH数据迁移是两种方式,一种全量,一种增量。并且默认执行树迁移时先删除表,在创建表。

同时,如果你修改了目标数据库的数据。会在数据增量时,把目标数据库修改和新增的数据覆盖。也就是说目标数据库不允许有和源数据库不一样的数据。

4.如何让源数据库只新增,不覆盖?

修改底层代码实现

三 源码解析

找到手动执行任务接口

  @TokenCheck
  @LogOperate(name = "手动执行任务", description = "'手动执行任务的ID为:'+#ids")
  @ApiOperation(value = "手动执行")
  @PostMapping(value = "/run", produces = MediaType.APPLICATION_JSON_VALUE)
  public Result runAssignments(@RequestBody List<Long> ids) {
    assignmentService.runAssignments(ids);
    return Result.success();
  }

在这个接口的service中,数据库连接等信息被封装到jobDetail被异步扔进scheduler.scheduleJob(jobDetail, simpleTrigger)任务中。这个任务一种有一个任务执行器来处理这个job,找到JobExecutorService

@Override
  public void executeInternal(JobExecutionContext context) throws JobExecutionException {
    currentThread = Thread.currentThread();
    JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
    if (interrupted) {
      log.info("Quartz task id:{} interrupted", jobDataMap.getLong(TASK_ID));
      return;
    }

    JobKey key = context.getJobDetail().getKey();
    Long taskId = jobDataMap.getLongValue(TASK_ID);
    Integer schedule = jobDataMap.getIntValue(SCHEDULE);
    AssignmentJobEntity assignmentJobEntity = assignmentJobDAO
        .newAssignmentJob(taskId, schedule, key.getName());
    try {
      ReentrantLock lock = mutexes.get(taskId.toString(), ReentrantLock::new);
      while (!lock.tryLock(1, TimeUnit.SECONDS)) {
        TimeUnit.SECONDS.sleep(1);
      }

      try {
        log.info("Execute Quartz Job, and task id is : {} , job id is: {}", taskId,
            assignmentJobEntity.getId());

        AssignmentTaskEntity task = assignmentTaskDAO.getById(taskId);
        AssignmentConfigEntity assignmentConfigEntity = assignmentConfigDAO
            .getByAssignmentTaskId(task.getId());

        log.info("Execute Assignment [taskId={}],Task Name: {} ,configuration properties:{}",
            task.getId(),
            task.getName(),
            task.getContent());

        try {
          DbswichProperties properties = JsonUtils.toBeanObject(
              task.getContent(), DbswichProperties.class);
          if (!assignmentConfigEntity.getFirstFlag()) {
            properties.getTarget().setTargetDrop(false);
            properties.getTarget().setChangeDataSync(true);
          }

          MigrationService mainService = new MigrationService(properties);
          if (interrupted) {
            log.info("Quartz task id:{} interrupted", jobDataMap.getLong(TASK_ID));
            return;
          }

          // 实际执行JOB
          mainService.run();

          if (assignmentConfigEntity.getFirstFlag()) {
            AssignmentConfigEntity config = new AssignmentConfigEntity();
            config.setId(assignmentConfigEntity.getId());
            config.setTargetDropTable(Boolean.FALSE);
            config.setFirstFlag(Boolean.FALSE);
            assignmentConfigDAO.updateSelective(config);
          }

          assignmentJobEntity.setStatus(JobStatusEnum.PASS.getValue());
          log.info("Execute Assignment Success [taskId={},jobId={}],Task Name: {}",
              task.getId(), assignmentJobEntity.getId(), task.getName());
        } catch (Throwable e) {
          assignmentJobEntity.setStatus(JobStatusEnum.FAIL.getValue());
          assignmentJobEntity.setErrorLog(ExceptionUtil.stacktraceToString(e));
          log.info("Execute Assignment Failed [taskId={},jobId={}],Task Name: {}",
              task.getId(), assignmentJobEntity.getId(), task.getName(), e);
        } finally {
          assignmentJobEntity.setFinishTime(new Timestamp(System.currentTimeMillis()));
          assignmentJobDAO.updateSelective(assignmentJobEntity);
        }
      } finally {
        lock.unlock();
      }
    } catch (ExecutionException | InterruptedException e) {
      throw new RuntimeException(e);
    }

这个方法前半部分是取值,后半部分是日志记录,只有 mainService.run(); 是执行job,我们继续往下看。

在MigrationService 这个数据迁移这个主逻辑类中,主要是判断任务参数中的排除表和包含表的情况,来确定数据迁移是哪几张表。判断最后,将需要迁移的数据库表添加到一个异步执行任务的队列中

             for (TableDescription td : tableList) {
                // 当没有配置迁移的表是,默认为所有物理表(不含有视图表)
                if (includes.isEmpty() && DBTableType.VIEW.name().equals(td.getTableType())) {
                  continue;
                }

                String tableName = td.getTableName();

                if (useExcludeTables) { //使用排除表
                  if (!filters.contains(tableName)) {
                    futures.add(
                        makeFutureTask(td, indexInternal, sourceDataSource, targetDataSource,
                            numberOfFailures, totalBytesSize));
                  }
                } else { //使用包含表
                  if (includes.size() == 1 && (includes.get(0).contains("*") || includes.get(0)
                      .contains("?"))) {
                    if (Pattern.matches(includes.get(0), tableName)) {
                      futures.add(
                          makeFutureTask(td, indexInternal, sourceDataSource, targetDataSource,
                              numberOfFailures, totalBytesSize));
                    }
                  } else if (includes.contains(tableName)) {
                    futures.add(makeFutureTask(td, indexInternal, sourceDataSource, targetDataSource,
                            numberOfFailures, totalBytesSize));
                  }
                }
              }
              

makeFutureTask方法是构建一个异步任务,这个方法中还有一个最终要的方法getMigrateHandler 单表迁移方法

CompletableFuture.supplyAsync 异步执行 getMigrateHandler 单表迁移这个方法

  /**
   * 构造一个异步执行任务
   *
   * @param td               表描述上下文
   * @param indexInternal    源端索引号
   * @param sds              源端的DataSource数据源
   * @param tds              目的端的DataSource数据源
   * @param numberOfFailures 失败的数量
   * @param totalBytesSize   同步的字节大小
   * @return CompletableFuture<Void>
   */
  private CompletableFuture<Void> makeFutureTask(TableDescription td,Integer indexInternal,
                                                 HikariDataSource sds, HikariDataSource tds,
                                                 AtomicInteger numberOfFailures,
                                                 AtomicLong totalBytesSize) {
    return CompletableFuture.supplyAsync(getMigrateHandler(td, indexInternal, sds, tds))
        .exceptionally(getExceptHandler(td, numberOfFailures))
        .thenAccept(totalBytesSize::addAndGet);
  }
  
 /**
   * 单表迁移处理方法
   *
   * @param td            表描述上下文
   * @param indexInternal 源端索引号
   * @param sds           源端的DataSource数据源
   * @param tds           目的端的DataSource数据源
   * @return Supplier<Long>
   */
  private Supplier<Long> getMigrateHandler(TableDescription td,
                                           Integer indexInternal,
                                           HikariDataSource sds,
                                           HikariDataSource tds) {
    return () -> MigrationHandler.createInstance(td, properties, indexInternal, sds, tds).get();
  }


MigrationHandler中的get方法就是最终数据迁移的实现

    if (properties.getTarget().getTargetDrop()) {
      log.error("targetDrop " + properties.getTarget().getChangeDataSync());
      /*
        如果配置了dbswitch.target.datasource-target-drop=true时,
        <p>
        先执行drop table语句,然后执行create table语句
       */

我们看properties.getTarget().getTargetDrop())为true时,会删除目表原始数据库,在执行createTable语句,我们需要目标数据源有自己独有的数据而不被覆盖,那么这个结果一定要为false。查找到这个类对象将targetDrop字段设置为false,DbswichProperties来自config.yml,建议也将字段设置为false

继续往下看,会看到两个方法:

doFullCoverSynchronize(writer) — 全量覆盖

doIncreaseSynchronize(writer) ---- 增量

我以为到这儿,我只需要将targetDrop设置为false就完成了,但是,实验之后,依然没有解决。

继续查看增量的方法,查看日志后,发现代码执行了 doDelete() 和 doUpdate doInsert 三个方法,并且输出了三个方法执行的次数,查看代码:

@Override
      public void destroy(List<String> fields) {

        //TODO 取消删除
        if (cacheDelete.size() > 0) {
          doDelete(fields);
        }

        if (cacheInsert.size() > 0) {
          doInsert(fields);
        }

        //TODO 取消更新
        if (cacheUpdate.size() > 0) {
          doUpdate(fields);
        }
        log.info("[IncreaseSync] Handle table [{}] total count: {}, Insert:{},Update:{},Delete:{} ",
            tableNameMapString, countTotal, countInsert, countUpdate, countDelete);
      }

原理:在底层设计中,源数据库和目标数据库数据会进行对比,并且标名这个数据状态(修改,新增,未变,删除)。并且将这新增放入cacheInsert,更新放入cacheUpdate,删除放入cacheDelete中。然后首先执行删除,将目标源数据进行删除,在执行新增,最后更新。

所以要想目标源保留独有数据,需要将doDelete和cacheUpdate方法取消,将对应的cache缓存清除以免内存溢出.

@Override
      public void destroy(List<String> fields) {

        //TODO 取消删除
/*        if (cacheDelete.size() > 0) {
          doDelete(fields);
        }*/

        if (cacheInsert.size() > 0) {
          doInsert(fields);
        }

        //TODO 取消更新
/*        if (cacheUpdate.size() > 0) {
          doUpdate(fields);
        }*/

        //TODO 缓存删除
        cacheUpdate.clear();
        cacheDelete.clear();

        log.info("[IncreaseSync] Handle table [{}] total count: {}, Insert:{},Update:{},Delete:{} ",
            tableNameMapString, countTotal, countInsert, countUpdate, countDelete);
      }
 类似资料: