随着业务的发展和合规要求,产品数据库将切换到Postgres。之前不同技术域,不同交付工程的数据分库管理的方式切换到PG数据库后将通过分schema管理。
ORM继续使用Mybatis,为使用迁移工作量尽可能小,现有的SQL代码不做大的修改。动态数据源实现考虑在Mybatis执行过程中做拦截,替换sql中的schema标识。
约定rest接口请求Header参数中增加schema信息。通过切面技术从请求头中提取schema后保存到线程变量。
package com.postgres.manager;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;
/**
* Schema切面, 提取header头中的schema保存到SchemaHolder中
*
* @author elon
* @since 2022-03-20
*/
@Aspect
@Component
@Order(9999)
public class SchemaAspect {
@Pointcut("@annotation(org.springframework.web.bind.annotation.GetMapping) "
+ "|| @annotation(org.springframework.web.bind.annotation.PostMapping) "
+ "|| @annotation(org.springframework.web.bind.annotation.DeleteMapping) "
+ "|| @annotation(org.springframework.web.bind.annotation.RequestMapping)")
void schema() {
}
/**
* 从请求头提取
*
* @param joinPoint
*/
@Before("schema()")
public void setSchema(JoinPoint joinPoint) {
String schema = getSchemaFromHeader();
SchemaHolder.set(schema);
}
@After("schema()")
public void clearSchema(JoinPoint joinPoint) {
SchemaHolder.clear();
}
/**
* 从请求头中后去schema信息
*
* @return schema
*/
private String getSchemaFromHeader() {
HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
String schema = request.getHeader("schema");
return schema;
}
}
package com.postgres.manager;
/**
* Schema持有类. 用于在异步线程或者跨多个方法传递schema信息
*
* @author elon
* @since 2022-03-19
*/
public class SchemaHolder {
private static ThreadLocal<String> schema = new ThreadLocal<>();
public static void set(String sch) {
schema.set(sch);
}
public static String get() {
return schema.get();
}
public static void clear() {
schema.remove();
}
}
package com.postgres.manager;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* schema拦截器注解。修饰mapper接口类,用以区分访问的pg数据库schema
*
* @author elon
* @since 2022-03-20
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface SchemaInterceptAnnotation {
/**
* schema类型。取值范围:business, common
*
* @return
*/
String schemaType() default "";
}
在DAO层接口类加上该注解,拦截器会动态切换schema.
package com.postgres.mapper;
import com.postgres.manager.SchemaInterceptAnnotation;
import com.postgres.model.ExamResult;
import com.postgres.model.User;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
@Mapper
@SchemaInterceptAnnotation(schemaType = "business")
public interface UserMapper {
/**
* 从schema获取user数据
*
* @return user列表
*/
List<User> getUserFromSchema(@Param("name") String name);
/**
* 插入用户数据到schema
*
* @param userList 用户列表
*/
void insertUser2Schema(@Param("list") List<User> userList);
/**
* 获取测试成绩.
*
* @return 测试成绩列表
*/
List<ExamResult> getExamResult();
}
package com.postgres.manager;
import org.apache.ibatis.executor.statement.StatementHandler;
import org.apache.ibatis.mapping.BoundSql;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.plugin.*;
import org.apache.ibatis.reflection.DefaultReflectorFactory;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.SystemMetaObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.lang.reflect.Field;
import java.sql.Connection;
import java.util.Properties;
/**
* StatementHandler拦截器. 在prepare方法执行前拦截,修改sql语句,增加schema.
*
* @author elon
* @since 2022-03-20
*/
@Component
@Intercepts({@Signature(type = StatementHandler.class, method = "prepare", args = {Connection.class, Integer.class})})
public class StatementHandlerInterceptor implements Interceptor {
private static final Logger LOGGER = LoggerFactory.getLogger(StatementHandlerInterceptor.class);
/**
* 业务数据分schema存储
*/
private static final String BUSINESS_SCHEMA = "business";
/**
* 公共的配置数据(不分schema), 固定库
*/
private static final String COMMON_SCHEMA = "common";
@Override
public Object intercept(Invocation invocation) throws Throwable {
StatementHandler statementHandler = (StatementHandler) invocation.getTarget();
MetaObject metaObject = MetaObject.forObject(statementHandler, SystemMetaObject.DEFAULT_OBJECT_FACTORY,
SystemMetaObject.DEFAULT_OBJECT_WRAPPER_FACTORY, new DefaultReflectorFactory());
MappedStatement mappedStatement = (MappedStatement) metaObject.getValue("delegate.mappedStatement");
String mapperMethod = mappedStatement.getId();
BoundSql boundSql = statementHandler.getBoundSql();
String sql = boundSql.getSql();
String mapperClass = mapperMethod.substring(0, mappedStatement.getId().lastIndexOf("."));
Class<?> classType = Class.forName(mapperClass);
SchemaInterceptAnnotation interceptAnnotation = classType.getAnnotation(SchemaInterceptAnnotation.class);
String schemaType = interceptAnnotation.schemaType();
String newSql = replaceSqlWithSchema(schemaType, sql, mapperMethod);
//通过反射修改sql语句
Field field = boundSql.getClass().getDeclaredField("sql");
field.setAccessible(true);
field.set(boundSql, newSql);
return invocation.proceed();
}
@Override
public Object plugin(Object object) {
if (object instanceof StatementHandler) {
return Plugin.wrap(object, this);
} else {
return object;
}
}
@Override
public void setProperties(Properties properties) {
}
private String replaceSqlWithSchema(String schemaType, String originalSql, String mapperMethod){
// 替换sql中的表名,加上schema
if (BUSINESS_SCHEMA.equals(schemaType)) {
String schema = SchemaHolder.get();
return originalSql.replaceAll(" t_", " " + schema + ".t_");
} else if (COMMON_SCHEMA.equals(schemaType)) {
return originalSql.replaceAll(" t_", " " + COMMON_SCHEMA + ".t_");
} else {
LOGGER.error("Invalid SchemaInterceptAnnotation. mapperMethod:{}", mapperMethod);
throw new IllegalArgumentException("Invalid SchemaInterceptAnnotation.");
}
}
}
加上如下处理, 拦截器才会生效
package com.postgres.config;
import com.postgres.manager.StatementHandlerInterceptor;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import java.util.List;
@Configuration
public class InterceptorConfig {
@Autowired
private List<SqlSessionFactory> sqlSessionFactoryList;
@PostConstruct
public void addSqlInterceptor() {
StatementHandlerInterceptor interceptor = new StatementHandlerInterceptor();
for (SqlSessionFactory sqlSessionFactory : sqlSessionFactoryList) {
sqlSessionFactory.getConfiguration().addInterceptor(interceptor);
}
}
}
完整的Demo代码还包括DataSource配置和XML中SQL,这些和普通的Spring Boot项目无异。参考github上的完整实现代码:https://github.com/ylforever/elon-postgres