当前位置: 首页 > 工具软件 > Schema > 使用案例 >

定义Mybatis拦截器动态切换postgre数据库schema

计泉
2023-12-01

背景

随着业务的发展和合规要求,产品数据库将切换到Postgres。之前不同技术域,不同交付工程的数据分库管理的方式切换到PG数据库后将通过分schema管理。
ORM继续使用Mybatis,为使用迁移工作量尽可能小,现有的SQL代码不做大的修改。动态数据源实现考虑在Mybatis执行过程中做拦截,替换sql中的schema标识。

提取请求参数中的schema

约定rest接口请求Header参数中增加schema信息。通过切面技术从请求头中提取schema后保存到线程变量。

1. 提取schema

package com.postgres.manager;

import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import javax.servlet.http.HttpServletRequest;

/**
 * Schema切面, 提取header头中的schema保存到SchemaHolder中
 *
 * @author elon
 * @since 2022-03-20
 */
@Aspect
@Component
@Order(9999)
public class SchemaAspect {
    @Pointcut("@annotation(org.springframework.web.bind.annotation.GetMapping) "
            + "|| @annotation(org.springframework.web.bind.annotation.PostMapping) "
            + "|| @annotation(org.springframework.web.bind.annotation.DeleteMapping) "
            + "|| @annotation(org.springframework.web.bind.annotation.RequestMapping)")
    void schema() {

    }

    /**
     * 从请求头提取
     *
     * @param joinPoint
     */
    @Before("schema()")
    public void setSchema(JoinPoint joinPoint) {
        String schema = getSchemaFromHeader();
        SchemaHolder.set(schema);
    }

    @After("schema()")
    public void clearSchema(JoinPoint joinPoint) {
        SchemaHolder.clear();
    }

    /**
     * 从请求头中后去schema信息
     *
     * @return schema
     */
    private String getSchemaFromHeader() {
        HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
        String schema = request.getHeader("schema");
        return schema;
    }
}

2. 保存schema的线程变量类

package com.postgres.manager;

/**
 * Schema持有类. 用于在异步线程或者跨多个方法传递schema信息
 *
 * @author elon
 * @since 2022-03-19
 */
public class SchemaHolder {
    private static ThreadLocal<String> schema = new ThreadLocal<>();

    public static void set(String sch) {
        schema.set(sch);
    }

    public static String get() {
        return schema.get();
    }

    public static void clear() {
        schema.remove();
    }
}

定义Mybatis拦截器

1. 定义拦截器注解,用于修饰DAO层级接口

package com.postgres.manager;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * schema拦截器注解。修饰mapper接口类,用以区分访问的pg数据库schema
 *
 * @author elon
 * @since 2022-03-20
 */
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface SchemaInterceptAnnotation {
    /**
     * schema类型。取值范围:business, common
     *
     * @return
     */
    String schemaType() default "";
}

在DAO层接口类加上该注解,拦截器会动态切换schema.

package com.postgres.mapper;

import com.postgres.manager.SchemaInterceptAnnotation;
import com.postgres.model.ExamResult;
import com.postgres.model.User;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;

import java.util.List;

@Mapper
@SchemaInterceptAnnotation(schemaType = "business")
public interface UserMapper {
    /**
     * 从schema获取user数据
     *
     * @return user列表
     */
    List<User> getUserFromSchema(@Param("name") String name);

    /**
     * 插入用户数据到schema
     *
     * @param userList 用户列表
     */
    void insertUser2Schema(@Param("list") List<User> userList);

    /**
     * 获取测试成绩.
     *
     * @return 测试成绩列表
     */
    List<ExamResult> getExamResult();
}

2. 拦截器替换sql中的表名为schema.表名

package com.postgres.manager;

import org.apache.ibatis.executor.statement.StatementHandler;
import org.apache.ibatis.mapping.BoundSql;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.plugin.*;
import org.apache.ibatis.reflection.DefaultReflectorFactory;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.SystemMetaObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.lang.reflect.Field;
import java.sql.Connection;
import java.util.Properties;

/**
 * StatementHandler拦截器. 在prepare方法执行前拦截,修改sql语句,增加schema.
 *
 * @author elon
 * @since 2022-03-20
 */
@Component
@Intercepts({@Signature(type = StatementHandler.class, method = "prepare", args = {Connection.class, Integer.class})})
public class StatementHandlerInterceptor implements Interceptor {
    private static final Logger LOGGER = LoggerFactory.getLogger(StatementHandlerInterceptor.class);

    /**
     * 业务数据分schema存储
     */
    private static final String BUSINESS_SCHEMA = "business";

    /**
     * 公共的配置数据(不分schema), 固定库
     */
    private static final String COMMON_SCHEMA = "common";

    @Override
    public Object intercept(Invocation invocation) throws Throwable {
        StatementHandler statementHandler = (StatementHandler) invocation.getTarget();
        MetaObject metaObject = MetaObject.forObject(statementHandler, SystemMetaObject.DEFAULT_OBJECT_FACTORY,
                SystemMetaObject.DEFAULT_OBJECT_WRAPPER_FACTORY, new DefaultReflectorFactory());

        MappedStatement mappedStatement = (MappedStatement) metaObject.getValue("delegate.mappedStatement");
        String mapperMethod = mappedStatement.getId();

        BoundSql boundSql = statementHandler.getBoundSql();
        String sql = boundSql.getSql();

        String mapperClass = mapperMethod.substring(0, mappedStatement.getId().lastIndexOf("."));
        Class<?> classType = Class.forName(mapperClass);

        SchemaInterceptAnnotation interceptAnnotation = classType.getAnnotation(SchemaInterceptAnnotation.class);
        String schemaType = interceptAnnotation.schemaType();
        String newSql = replaceSqlWithSchema(schemaType, sql, mapperMethod);

        //通过反射修改sql语句
        Field field = boundSql.getClass().getDeclaredField("sql");
        field.setAccessible(true);
        field.set(boundSql, newSql);

        return invocation.proceed();
    }

    @Override
    public Object plugin(Object object) {
        if (object instanceof StatementHandler) {
            return Plugin.wrap(object, this);
        } else {
            return object;
        }
    }

    @Override
    public void setProperties(Properties properties) {

    }

    private String replaceSqlWithSchema(String schemaType, String originalSql, String mapperMethod){
        // 替换sql中的表名,加上schema
        if (BUSINESS_SCHEMA.equals(schemaType)) {
            String schema = SchemaHolder.get();
            return originalSql.replaceAll(" t_", " " + schema + ".t_");
        } else if (COMMON_SCHEMA.equals(schemaType)) {
            return originalSql.replaceAll(" t_", " " + COMMON_SCHEMA + ".t_");
        } else {
            LOGGER.error("Invalid SchemaInterceptAnnotation. mapperMethod:{}", mapperMethod);
            throw new IllegalArgumentException("Invalid SchemaInterceptAnnotation.");
        }
    }
}

2. 添加拦截器

加上如下处理, 拦截器才会生效

package com.postgres.config;

import com.postgres.manager.StatementHandlerInterceptor;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import java.util.List;

@Configuration
public class InterceptorConfig {

    @Autowired
    private List<SqlSessionFactory> sqlSessionFactoryList;

    @PostConstruct
    public void addSqlInterceptor() {
        StatementHandlerInterceptor interceptor = new StatementHandlerInterceptor();
        for (SqlSessionFactory sqlSessionFactory : sqlSessionFactoryList) {
            sqlSessionFactory.getConfiguration().addInterceptor(interceptor);
        }
    }
}

完整的Demo代码还包括DataSource配置和XML中SQL,这些和普通的Spring Boot项目无异。参考github上的完整实现代码:https://github.com/ylforever/elon-postgres

 类似资料: