前段时间有一项目,需要同步第三方平台的Oracle数据库数据到我们自己SQL Server库,一开始我想着用datax结合datax-web直接创建同步任务,分分钟搞定,然鹅,查看DataX SqlServerWriter 文档注意事项:
注意:
- 目的表所在数据库必须是主库才能写入数据;整个任务至少需具备 insert into…的权限,是否需要其他权限,取决于你任务配置中在 preSql 和 postSql 中指定的语句。
2.SqlServerWriter和MysqlWriter不同,不支持配置writeMode参数。
SqlServerWriter和MysqlWriter不同,不支持配置writeMode参数。也就是说,该writer不支持增量更新
,每次只能全量更新(在preSql中写delete 或 TRUNCATE 语句,清空表,再插入),这数据量一大的话,就太费劲了。
因此,我通过SpringBoot项目,结合xxl-job,使用多数据源的方式,开发定时同步任务,从Oracle 增量拉取数据,再插入
或更新
到SQL Server库中。由于是老系统,部分照片数据都直接存储在数据库中,因此,增量更新非常有必要!
我们只需要开发xxl-job任务执行器模块即可,工程pom如下:
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.3.7.RELEASE</spring-boot.version>
<xxl-job-version>2.3.0</xxl-job-version>
<hutool-version>5.8.7</hutool-version>
<mybatis-plus.version>3.5.2</mybatis-plus.version>
<orai18n.version>12.1.0.2.0</orai18n.version>
<dynamic-datasource.version>3.4.0</dynamic-datasource.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- sqlserver 驱动 -->
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<scope>runtime</scope>
</dependency>
<!-- oracle 驱动 -->
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc8</artifactId>
<scope>runtime</scope>
</dependency>
<!-- 解决Oracle 字符集支持问题 -->
<dependency>
<groupId>cn.easyproject</groupId>
<artifactId>orai18n</artifactId>
<version>${orai18n.version}</version>
</dependency>
<!-- mysql 驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<!--mybatis plus-->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
<!-- 多数据源 starter -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>dynamic-datasource-spring-boot-starter</artifactId>
<version>${dynamic-datasource.version}</version>
</dependency>
<!-- xxl-job-core -->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>${xxl-job-version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>${hutool-version}</version>
</dependency>
<!--测试包-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.3.7.RELEASE</version>
<configuration>
<mainClass>com.lifp.DynamicDatasourceApplication</mainClass>
</configuration>
<executions>
<execution>
<id>repackage</id>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
项目涉及三个数据库:Oracle
、SQL Server
、MySQL
,需要分别引入他们的驱动,另外需要引入多数据源模块dynamic-datasource-spring-boot-starter
。此外,需要集成分布式定时任务调度模块核心依赖:xxl-job-core
使用MyBatis-Plus,来开发,不详细介绍,只贴核心代码。
示例表ms_demo,只有如下三个字段。
@Data
@TableName(value = "ms_demo")
public class MsDemo {
@TableId(value = "id", type = IdType.INPUT)
private Long id;
/**
* 姓名
*/
@TableField(value = "name")
private String name;
/**
* 照片
*/
@TableField(value = "Photo")
private byte[] photo;
/**
* 时间戳,不存表,只用来封装Oracle中时间戳数据
*/
@TableField(exist = false)
private Long timeStamp;
}
SQL Server端,只是批量插入或更新,直接使用MyBatis-Plus基础接口即可, 无需再做额外开发。
public interface MsDemoService extends IService<MsDemo> {
}
使用多数据源后,可以在类和方法上标注@DS()注解,标明要使用的数据源
@DS("sqlserver")
@Service
public class MsDemoServiceImpl extends ServiceImpl<MsDemoServiceMapper, MsDemo> implements MsDemoService{
}
mapper接口和mapper.xml省略…
Oracle端的接口,需要从库中查出数据,提供给SQL Server端接口。但是,两端的表结构不一样,而且Oracle端需要从3张表组合查询,最终同步到SQL Server端一张表中。因此,Oracle端最终查询结果,可以直接用SQL Server端目标表实体对象来接收,具体如下。
public interface OracleMsDemoService extends IService<MsDemo> {
/**
*@Description 准备待同步的数据
*@param timeStamp 同步时间戳,为上次同步数据最大时间戳,记录与Job表中
*@Return
*/
List<MsDemo> prepareSyncData(String timeStamp);
}
此处是Oracle端接口,使用@DS(“oracle”)注解标明使用配置文件自定义oracle数据源
@DS("oracle")
@Service
public class OracleMsDemoServiceImpl extends ServiceImpl<OracleMsDemoPhotoMapper, MsDemo> implements OracleMsDemoService {
@Override
public List<MsDemo> prepareSyncData(String timeStamp) {
return baseMapper.prepareSyncData(timeStamp);
}
}
public interface OracleMsDemoPhotoMapper extends BaseMapper<MsDemo> {
List<MsDemo> prepareSyncData(@Param("timeStamp") String timeStamp);
}
<mapper namespace="com.lifp.mapper.OracleMsDemoPhotoMapper">
<resultMap id="BaseResultMap" type="com.lifp.entity.MsDemo">
<!--@mbg.generated-->
<!--@Table ms_demo-->
<id column="Id" jdbcType="VARCHAR" property="id" />
<result column="Name" jdbcType="VARCHAR" property="name" />
<result column="Photo" jdbcType="BLOB" property="photo" />
</resultMap>
<select id="prepareSyncData" resultMap="BaseResultMap">
select .....
from ....
<if test="timeStamp != null and timeStamp != ''">
<!--根据时间戳,获取增量更新的数据(目标时间戳以后的数据)-->
<![CDATA[ and GREATEST(t.sjc,t.sjtbsjc)> #{timeStamp} ]]>
</if>
</select>
</mapper>
Oracle库每张表都有时间戳字段,记录数据更新时间,后期增量更新也是基于该字段做增量更新,即每次拉取数据,都只获取目标时间戳以后的数据。
但是,每次拉取完数据后,这批数据中最大时间戳记录在哪里呢?
由于使用xxl-job
去调度定时任务,那么我们可以在xxl_job_info
表中添加一列,记录当前数据同步 job 的最大时间戳。每次调度时,首先从表中获取当前任务上一次执行后的最大时间戳,用它去Oracle中查询数据,查询到数据后,再将所查询数据中的最大时间戳更新到xxl_job_info表当前任务记录上去即可。因此首先需要改造该表,然后再创建其操作接口。
xxl_job_info 表中添加了incremental_update_date字段,用来记录增量时间戳。
@Data
@TableName(value = "xxl_job_info")
public class XxlJobInfo {
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
/**
* 执行器主键ID
*/
@TableField(value = "job_group")
private Integer jobGroup;
@TableField(value = "job_desc")
private String jobDesc;
@TableField(value = "add_time")
private Date addTime;
@TableField(value = "update_time")
private Date updateTime;
/**
* 作者
*/
@TableField(value = "author")
private String author;
/**
* 报警邮件
*/
@TableField(value = "alarm_email")
private String alarmEmail;
/**
* 调度类型
*/
@TableField(value = "schedule_type")
private String scheduleType;
/**
* 调度配置,值含义取决于调度类型
*/
@TableField(value = "schedule_conf")
private String scheduleConf;
/**
* 调度过期策略
*/
@TableField(value = "misfire_strategy")
private String misfireStrategy;
/**
* 执行器路由策略
*/
@TableField(value = "executor_route_strategy")
private String executorRouteStrategy;
/**
* 执行器任务handler
*/
@TableField(value = "executor_handler")
private String executorHandler;
/**
* 执行器任务参数
*/
@TableField(value = "executor_param")
private String executorParam;
/**
* 阻塞处理策略
*/
@TableField(value = "executor_block_strategy")
private String executorBlockStrategy;
/**
* 任务执行超时时间,单位秒
*/
@TableField(value = "executor_timeout")
private Integer executorTimeout;
/**
* 失败重试次数
*/
@TableField(value = "executor_fail_retry_count")
private Integer executorFailRetryCount;
/**
* GLUE类型
*/
@TableField(value = "glue_type")
private String glueType;
/**
* GLUE源代码
*/
@TableField(value = "glue_source")
private String glueSource;
/**
* GLUE备注
*/
@TableField(value = "glue_remark")
private String glueRemark;
/**
* GLUE更新时间
*/
@TableField(value = "glue_updatetime")
private Date glueUpdatetime;
/**
* 子任务ID,多个逗号分隔
*/
@TableField(value = "child_jobid")
private String childJobid;
/**
* 调度状态:0-停止,1-运行
*/
@TableField(value = "trigger_status")
private Byte triggerStatus;
/**
* 上次调度时间
*/
@TableField(value = "trigger_last_time")
private Long triggerLastTime;
/**
* 下次调度时间
*/
@TableField(value = "trigger_next_time")
private Long triggerNextTime;
/**
* 增量更新时间(时间戳)
*/
@TableField(value = "incremental_update_date")
private Long incrementalUpdateDate;
}
public interface XxlJobInfoService extends IService<XxlJobInfo>{
}
xxl-job部署于本地,连接的是MySQL数据库,在此使用@DS(“xxljob”)指定使用自定义配置数据源xxljob
@DS("xxljob")
@Service
public class XxlJobInfoServiceImpl extends ServiceImpl<XxlJobInfoMapper, XxlJobInfo> implements XxlJobInfoService{
}
创建任务类,在该类中创建数据同步任务方法,然后用@XxlJob()注解,标明执行器名称(后面配置同步任务时使用)
@Component
@Slf4j
public class MsDemoSyncJob {
@Autowired
private XxlJobInfoService xxlJobInfoService;
@Autowired
private OracleMsDemoService oracleMsDemoService;
@Autowired
private MsDemoService msDemoService;
@XxlJob("msDemoSyncJob")
public void msDemoSyncJob() throws Exception {
//1、根据当前任务id,查询任务信息
XxlJobInfo jobInfo = xxlJobInfoService.getById(XxlJobHelper.getJobId());
//上次调度时间(时间戳),作为本次增量起始时间
//xxl-job job表需要添加incremental_update_date(增量更新时间(时间戳),varchar类型,初始可为空)
Long incrementalUpdateDate = jobInfo.getIncrementalUpdateDate();
//自定义参数
//String param = XxlJobHelper.getJobParam();
//2、根据起始时间戳,查询待增量同步数据
List<MsDemo> msDemoList = oracleMsDemoService.prepareSyncData(incrementalUpdateDate);
if(msDemoList.size()>0){
//3、执行插入或更新操作
msDemoService.saveOrUpdateBatch(msDemoList);
//数据同步后,使用stream API统计所获取数据的时间戳
LongSummaryStatistics statistics = msDemoList.stream().mapToLong(item -> {
return item.getTimeStamp();//获取每条数据的时间戳
}).summaryStatistics();
//从stream统计信息获取最大时间戳
long maxTimeStamp = statistics.getMax();
//4、更新时间戳(只有成功调度,才更新)
jobInfo.setIncrementalUpdateDate(maxTimeStamp);
xxlJobInfoService.updateById(jobInfo);
XxlJobHelper.log("当前任务同步数据,最大时间戳已更新至xxl_job_info表,时间戳为:{}",max);
}
XxlJobHelper.log("数据同步任务(msDemoSyncJob),同步了{}条数据",msDemoList.size());
}
}
@Component
@ConfigurationProperties(prefix = "xxl.job")
@Data
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
//调度中心部署跟地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
// eg: http://127.0.0.1:8089/xxl-job-admin
private String adminAddresses;
//执行器通讯TOKEN [选填]:非空时启用;
private String accessToken;
// 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
private String appname;
//执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
private String address;
//执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";
private String ip;
//执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
private int port;
//执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
private String logPath;
//执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能;
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
/**
* 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
*
* 1、引入依赖:
* <dependency>
* <groupId>org.springframework.cloud</groupId>
* <artifactId>spring-cloud-commons</artifactId>
* <version>${version}</version>
* </dependency>
*
* 2、配置文件,或者容器启动变量
* spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
*
* 3、获取IP
* String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
*/
}
需要注意:
spring.datasource.dynamic.datasource.xxx 为多数据源配置
spring:
datasource:
dynamic:
primary: xxljob
datasource:
#配置名称为 xxljob的数据源
xxljob:
url: jdbc:mysql://localhost:3306/xxl_job?characterEncoding=UTF-8&serverTimezone=UTC&useUnicode=true&nullCatalogMeansCurrent=true&useSSL=false
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
server:
port: 8889
spring:
application:
name: data-sync
#多数据源
datasource:
dynamic:
#默认的数据源名称为 master,可以根据实际情况自定义
primary: xxljob
datasource:
#配置名称为 xxljob的数据源
xxljob:
url: jdbc:mysql://localhost:3306/xxl_job?characterEncoding=UTF-8&serverTimezone=UTC&useUnicode=true&nullCatalogMeansCurrent=true&useSSL=false
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
#配置名称为sqlserver的数据源
sqlserver:
url: jdbc:sqlserver://192.168.109.128:1433;database=datax
username: SA
password: Abc12345
driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
#配置名称为oracle的数据源
oracle:
url: jdbc:oracle:thin:@xx.xx.xx.xx:1521:ORCL
username: xxxxxx
password: xxxxxx@123
driver-class-name: oracle.jdbc.driver.OracleDriver
mybatis-plus:
#configuration:
#log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
#默认位置 classpath*:/mapper/**/*.xml
mapper-locations: classpath*:/mapper/**/*.xml
# xxl job相关配置
xxl:
job:
# xxl job管理端地址
admin-addresses: http://127.0.0.1:8088/xxl-job-admin
appname: data-sync
log-retention-days: 30
完成以上开发后,就可以进行项目测试,然后打包部署了。
首先,要先启动xxl-job管理端,例如启动后服务端地址为:http://127.0.0.1:8088/xxl-job-admin;
然后启动本项目;
登录管理端,在执行器中添加本执行器;
创建bean类型的任务,jobHandler名称为:msDemoSyncJob
测试
该项目中,使用多数据源,结合xxl-job,实现异构数据库之间数据同步。通过xxl-job进行任务调度,数据同步频率、策略配置更灵活,适用于datax不满足的情况下来使用。
dynamic-datasource-spring-boot-starter 是一个基于springboot的快速集成多数据源的启动器
_
分割的数据源 首部 即为组的名称,相同组名称的数据源会放在一个组下。spring.datasource.dynamic.primary
修改。引入dynamic-datasource-spring-boot-starter。 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-agAQR9ec-1664518235894)(https://img.shields.io/maven-central/v/com.baomidou/dynamic-datasource-spring-boot-starter.svg)]
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>dynamic-datasource-spring-boot-starter</artifactId>
<version>${version}</version>
</dependency>
配置数据源
spring:
datasource:
dynamic:
primary: master #设置默认的数据源或者数据源组,默认值即为master
strict: false #严格匹配数据源,默认false. true未匹配到指定数据源时抛异常,false使用默认数据源
datasource:
master:
url: jdbc:mysql://xx.xx.xx.xx:3306/dynamic
username: root
password: 123456
driver-class-name: com.mysql.jdbc.Driver # 3.2.0开始支持SPI可省略此配置
slave_1:
url: jdbc:mysql://xx.xx.xx.xx:3307/dynamic
username: root
password: 123456
driver-class-name: com.mysql.jdbc.Driver
slave_2:
url: ENC(xxxxx) # 内置加密,使用请查看详细文档
username: ENC(xxxxx)
password: ENC(xxxxx)
driver-class-name: com.mysql.jdbc.Driver
#......省略
#以上会配置一个默认库master,一个组slave下有两个子库slave_1,slave_2
使用 @DS 切换数据源。
@DS 可以注解在方法上或类上,同时存在就近原则 方法上注解 优先于 类上注解。
注解 | 结果 |
---|---|
没有@DS | 默认数据源 |
@DS(“dsName”) | dsName可以为组名也可以为具体某个库的名称 |
@Service
@DS("slave")
public class UserServiceImpl implements UserService {
@Autowired
private JdbcTemplate jdbcTemplate;
public List selectAll() {
return jdbcTemplate.queryForList("select * from user");
}
@Override
@DS("slave_1")
public List selectByCondition() {
return jdbcTemplate.queryForList("select * from user where age >10");
}
}