当前位置: 首页 > 工具软件 > P6SPY > 使用案例 >

使用 p6spy,拦截到持久层执行的sql及参数

易骁
2023-12-01

声明:文章内容是 自己使用后整理,大部分工具代码出自大牛,但因无法确认出处,故仅在此处由衷的对无私分享源代码的作者表示感谢与致敬!

本人在拦截到sql的基础上加了分析功能和异常告警功能

1、导入p6spy的jar包,如果是maven项目引入pom

<dependency>
            <groupId>p6spy</groupId>
            <artifactId>p6spy</artifactId>
            <version>3.9.1</version>
        </dependency>

2、修改 datasource数据源 的 driverClassName驱动和 url地址 为 com.p6spy.engine.spy.P6SpyDriver

spring.datasource.driverClassName=com.p6spy.engine.spy.P6SpyDriver
spring.datasource.url=jdbc:p6spy:mysql://${DB_HOST}:3306/${DB_DATABASE}?autoReconnect=true&useUnicode=true&characterEncoding=UTF8&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=CTT&allowMultiQueries=true

3、添加配置文件(springboot项目是放在resource目录下)

spy.properties

module.log=com.p6spy.engine.logging.P6LogFactory,com.p6spy.engine.outage.P6OutageFactory
# 自定义日志打印 自定义P6SpyLogger类的地址
logMessageFormat=com.xxx.xxx.xxx.config.P6SpyLoggerFormatStrategy
# 使用日志系统记录sql (default is com.p6spy.engine.spy.appender.FileLogger)
#appender=com.p6spy.engine.spy.appender.StdoutLogger
#appender=com.p6spy.engine.spy.appender.FileLogger
appender=com.p6spy.engine.spy.appender.Slf4JLogger
## 配置记录Log例外
excludecategories=info,debug,result,batc,resultset
# 设置使用p6spy driver来做代理
deregisterdrivers=true
# 日期格式
dateformat=yyyy-MM-dd HH:mm:ss
# 实际驱动
driverlist=com.mysql.cj.jdbc.Driver
# 是否开启慢SQL记录
outagedetection=true
# 慢SQL记录标准 秒
outagedetectioninterval=2

①、修改driverlist为我们的实际驱动

②、指定日志输出样式logMessageFormat 默认为com.p6spy.engine.spy.appender.SingleLineFormat , 单行输出 不格式化语句。如果不满足,可以仿照其源码,实现MessageFormattingStrategy,自定义日志打印类(如下P6SpyLoggerFormatStrategy.java )

③、可以使用默认的日志系统类:Slf4JLogger、StdoutLogger、FileLogger,如果均不满足,可以仿照其源码,继承 FormattedLogger,自定义类

4、然后自定义一个类实现 MessageFormattingStrategy 接口,自定义日志格式化方式

P6SpyLoggerFormatStrategy .java

public class P6SpyLoggerFormatStrategy implements MessageFormattingStrategy {

    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");

    /**
     * 日志格式化方式(打印SQL日志会进入此方法,耗时操作,生产环境不建议使用)
     *
     * @param connectionId: 连接ID
     * @param now:          当前时间
     * @param elapsed:      花费时间
     * @param category:     类别
     * @param prepared:     预编译SQL
     * @param sql:          最终执行的SQL
     * @param url:          数据库连接地址
     * @return 格式化日志结果
     **/
    @Override
    public String formatMessage(int connectionId, String now, long elapsed,
                                String category, String prepared, String sql,
                                String url) {

        if (Func.isEmpty(sql)) {
            return "";
        }
        // 格式化sql
        sql = Func.clearExtraSpaces(sql);
        // sql的DML
        String dml = SqlUtils.getSqlDML(sql);
        // 分析sql
        String analyzeResult = SqlUtils.analyzeSqlWhere(sql);
        // 拼装显示
        StringBuilder sb = new StringBuilder().append(formatter.format(LocalDateTime.now())).append(" |");
        if (Func.isNotEmpty(analyzeResult)) {
            sb.append(analyzeResult).append("|");
        }
        sb.append(dml).append(" |took ").append(elapsed).append("ms |")
                .append(category).append("|").append(connectionId).append("|")
                .append(sql).append(";");
        // 风险sql会发送告警(邮件、短信、钉钉。。。),目前采用钉钉群告警,异步发送
        if (Func.isNotEmpty(analyzeResult)) {
            ThreadPoolUtils.execute(new SendNotice(sb.toString(), dml, analyzeResult, MDC.get("traceId")));
        }
        return sb.toString();
    }

    /** 内部线程类 */
    class SendNotice extends Thread {

        private String msg;
        private String dml;
        private String analyzeResult;
        private String traceId;

        public SendNotice(String msg, String dml, String analyzeResult,String traceId) {
            this.msg = msg;
            this.dml = dml;
            this.analyzeResult = analyzeResult;
            this.traceId = traceId;
        }

        @Override
        public void run() {
//            HttpServletRequest request = SpringContextHolder.getBean(HttpServletRequest.class);
//            String uri = request.getRequestURI();
//            if (!StringUtils.isEmpty(request.getQueryString())) {
//                uri = uri + "?" + request.getQueryString();
//            }

            HashMap<String, Object> parms = new HashMap<>(MapUtils.getSize(7));

            parms.put("title", dml);
            parms.put("traceId", traceId);
            parms.put("message", analyzeResult);
            parms.put("body", msg);

            EventSendService eventSendService = SpringContextUtil.getBean(EventSendService.class);
            eventSendService.sendAlertMsg(parms, "sql");
        }
    }
}

这样,p6spy就可以记录执行sql并输出了。后续是扩展部分:分析sql和异步发送告警

扩展部分一:分析sql的工具类  SqlUtils.java

@Slf4j
public class SqlUtils {

    /**
     * 获取aop中的SQL语句
     *
     * @param pjp
     * @param sqlSessionFactory
     * @return
     * @throws IllegalAccessException
     */
    public static String getMybatisSql(ProceedingJoinPoint pjp, SqlSessionFactory sqlSessionFactory) throws IllegalAccessException {
        Map<String, Object> map = new HashMap<>(16);
        //1.获取namespace+methdoName
        MethodSignature signature = (MethodSignature) pjp.getSignature();
        Method method = signature.getMethod();
        String namespace = method.getDeclaringClass().getName();
        String methodName = method.getName();
        //2.根据namespace+methdoName获取相对应的MappedStatement
        Configuration configuration = sqlSessionFactory.getConfiguration();
        MappedStatement mappedStatement = configuration.getMappedStatement(namespace + "." + methodName);
        //3.获取方法参数列表名
        Parameter[] parameters = method.getParameters();
        //4.形参和实参的映射,获取实参
        Object[] objects = pjp.getArgs();
        Annotation[][] parameterAnnotations = method.getParameterAnnotations();
        for (int i = 0; i < parameterAnnotations.length; i++) {
            Object object = objects[i];
            //说明该参数没有注解,此时该参数可能是实体类,也可能是Map,也可能只是单参数
            if (parameterAnnotations[i].length == 0) {
                if (object.getClass().getClassLoader() == null && object instanceof Map) {
                    map.putAll((Map<? extends String, ?>) object);
                    log.info("该对象为Map");
                } else {//形参为自定义实体类
                    map.putAll(objectToMap(object));
                    log.info("该对象为用户自定义的对象");
                }
            } else {//说明该参数有注解,且必须为@Param
                for (Annotation annotation : parameterAnnotations[i]) {
                    if (annotation instanceof Param) {
                        map.put(((Param) annotation).value(), object);
                    }
                }
            }
        }
        //5.获取boundSql
        BoundSql boundSql = mappedStatement.getBoundSql(map);
//        BoundSql boundSql = mappedStatement.getBoundSql();
        return showSql(configuration, boundSql);
    }

    /**
     * 解析BoundSql,生成不含占位符的SQL语句
     *
     * @param configuration
     * @param boundSql
     * @return
     */
    private static String showSql(Configuration configuration, BoundSql boundSql) {
        Object parameterObject = boundSql.getParameterObject();
        List<ParameterMapping> parameterMappings = boundSql.getParameterMappings();
        String sql = boundSql.getSql().replaceAll("[\\s]+", " ");
        if (parameterMappings.size() > 0 && parameterObject != null) {
            TypeHandlerRegistry typeHandlerRegistry = configuration.getTypeHandlerRegistry();
            if (typeHandlerRegistry.hasTypeHandler(parameterObject.getClass())) {
                sql = sql.replaceFirst("\\?", getParameterValue(parameterObject));
            } else {
                MetaObject metaObject = configuration.newMetaObject(parameterObject);
                for (ParameterMapping parameterMapping : parameterMappings) {
                    String propertyName = parameterMapping.getProperty();
                    String[] s = metaObject.getObjectWrapper().getGetterNames();
                    s.toString();
                    if (metaObject.hasGetter(propertyName)) {
                        Object obj = metaObject.getValue(propertyName);
                        sql = sql.replaceFirst("\\?", getParameterValue(obj));
                    } else if (boundSql.hasAdditionalParameter(propertyName)) {
                        Object obj = boundSql.getAdditionalParameter(propertyName);
                        sql = sql.replaceFirst("\\?", getParameterValue(obj));
                    }
                }
            }
        }
        return sql;
    }

    /**
     * 若为字符串或者日期类型,则在参数两边添加''
     *
     * @param obj
     * @return
     */
    private static String getParameterValue(Object obj) {
        String value = null;
        if (obj instanceof String) {
            value = "'" + obj.toString() + "'";
        } else if (obj instanceof Date) {
            DateFormat formatter = DateFormat.getDateTimeInstance(DateFormat.DEFAULT, DateFormat.DEFAULT, Locale.CHINA);
            value = "'" + formatter.format(new Date()) + "'";
        } else {
            if (obj != null) {
                value = obj.toString();
            } else {
                value = "";
            }
        }
        return value;
    }

    /**
     * 获取利用反射获取类里面的值和名称
     *
     * @param obj
     * @return
     * @throws IllegalAccessException
     */
    private static Map<String, Object> objectToMap(Object obj) throws IllegalAccessException {
        Map<String, Object> map = new HashMap<>(16);
        Class<?> clazz = obj.getClass();
        log.info("Class<?>={}",clazz);
        // 获取本类及其父类的属性,↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
        List<Field> fieldList = new ArrayList<>();
        while (clazz != null) {
            fieldList.addAll(new ArrayList<>(Arrays.asList(clazz.getDeclaredFields())));
            clazz = clazz.getSuperclass();
        }
        // 获取本类及其父类的属性,↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑
        for (Field field : fieldList) {
            field.setAccessible(true);
            String fieldName = field.getName();
            Object value = field.get(obj);
            map.put(fieldName, value);
        }
        return map;
    }

    /**
     * 获取 DML 的 方式(insert、delete、update、select)
     * @param sql
     * @return
     */
    public static String getSqlDML(String sql) {

        if(Func.isEmpty(sql)){
            return null;
        }
        try {
            sql=sql.trim();
            int endIndex = sql.indexOf(" ")!=-1?sql.indexOf(" "):sql.length()>=6?6:sql.length();
            return sql.substring(0, endIndex).toUpperCase();
        } catch (Exception e) {
            log.error("SqlUtils- 获取sql的DML 异常",e);
            return null;
        }
    }

    /**
     * 获取表名
     * @param sql
     * @return
     */
    public static String getTableName(String sql) {

        String dml = getSqlDML(sql);

        if(Func.isEmpty(dml)){
            return null;
        }
        // LogSqlEnum.DmlEnum.UPDATE.equals(dml) || LogSqlEnum.DmlEnum.DELETE.equals(dml) || LogSqlEnum.DmlEnum.SELECT.equals(dml)
        try {
            sql = sql.substring(6).trim().toUpperCase();
            if((LogSqlConstant.DmlEnum.DELETE.equals(dml) || LogSqlConstant.DmlEnum.SELECT.equals(dml))
                    && sql.contains("FROM")){
                sql=sql.substring(sql.indexOf("FROM")+4).trim();
                String[] s = sql.split(" ");
                if(s.length>1){
                    return s[0].trim();
                }
            }
            if(LogSqlConstant.DmlEnum.UPDATE.equals(dml) && sql.contains("SET")){
                return sql.substring(6,sql.indexOf("SET")).trim();
            }
            return null;
        } catch (Exception e) {
            log.error("SqlUtils- 获取表名 异常",e);
            return null;
        }
    }

    /**
     * 分析sql的where条件
     * @param sql
     * @return
     */
    public static String analyzeSqlWhere(String sql) {
        try {
            String dml = getSqlDML(sql);
            // 忽略枚举表
            if(LogSqlConstant.DmlEnum.SELECT.equals(dml)
                    && LogSqlConstant.getIgnoreTable().contains(getTableName(sql))){
                return null;
            }

            if(LogSqlConstant.DmlEnum.UPDATE.equals(dml) || LogSqlConstant.DmlEnum.DELETE.equals(dml) || LogSqlConstant.DmlEnum.SELECT.equals(dml)){
                String sqlUpperCase = sql.toUpperCase();

                if(!sqlUpperCase.contains("WHERE")){
                    return "【高风险sql】"+dml+"无WHERE条件";
                }else {
                    String whereStr = sqlUpperCase.substring(sqlUpperCase.indexOf("WHERE")+5);
                    if(whereStr.contains("GROUP BY")){
                        whereStr = whereStr.substring(0,whereStr.indexOf("GROUP BY"));
                    }else if(whereStr.contains("ORDER BY")){
                        whereStr = whereStr.substring(0,whereStr.indexOf("ORDER BY"));
                    }
                    whereStr = whereStr.trim();
                    List<String> blList = Lists.newArrayList();
                    if(whereStr.contains("(")){
                        String[] bl = whereStr.split("\\(");
                        for (String b : bl) {
                            if(Func.isNotEmpty(b)){
                                blList.add(b);
                            }
                        }
                    }
                    List<String> brList = Lists.newArrayList();
                    for (String bl : blList) {
                        String[] br = bl.split("\\)");
                        for (String b : br) {
                            if(Func.isNotEmpty(b)){
                                brList.add(b);
                            }
                        }
                    }
                    List<String> andList = Lists.newArrayList();
                    for (String br : brList) {

                        String[] ands = br.split("AND");
                        for (String s : ands) {
                            if(Func.isNotEmpty(s)){
                                andList.add(s);
                            }
                        }
                    }

                    List<String> orList = Lists.newArrayList();
                    for (String an : andList) {
                        String[] ors = an.split("OR");
                        for (String s : ors) {
                            if(Func.isNotEmpty(s)){
                                orList.add(s);
                            }
                        }
                    }
                    StringBuilder nullSB = new StringBuilder();
                    for (String or : orList) {
                        String[] eqs = or.split("=");
                        if(eqs.length==2
                                && (Func.isEmpty(eqs[1]) || "null".equalsIgnoreCase(eqs[1].trim()) )){
                            nullSB.append(or).append(";");
                        }
                    }

                    return nullSB.length()>0?nullSB.insert(0,"【中风险sql】条件值为空:").toString():null;
                }
            }
            return null;
        } catch (Exception e) {
            log.error("SqlUtils-分析sql的where条件异常",e);
            return "SqlUtils-分析sql的where条件异常";
        }
    }

}

扩展部分二:异步发送告警

配置异步线程池 AsyncConfig.java

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 线程池的配置
 */
@Configuration
public class AsyncConfig {

    /** 核心线程数 */
    private static final int CORE_POOL_SIZE =10;
    /** 最大线程数 */
    private static final int MAX_POOL_SIZE = 50;
    /** 队列最大长度 >=mainExecutor.maxSize  */
    private static final int QUEUE_CAPACITY =10;
    /** 线程池维护线程所允许的空闲时间 */
    private static final int KEEP_ALIVE_SECONDS =20;
    /** 线程池对拒绝任务(无线程可用)的处理策略 */
    private RejectedExecutionHandler rejectedExecutionHandler =new ThreadPoolExecutor.AbortPolicy();


    @Bean("asyncTaskExecutor")
    public AsyncTaskExecutor asyncTaskExecutor() {
        ThreadPoolTaskExecutor asyncTaskExecutor = new ThreadPoolTaskExecutor();
        asyncTaskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
        asyncTaskExecutor.setCorePoolSize(CORE_POOL_SIZE);
        asyncTaskExecutor.setQueueCapacity(QUEUE_CAPACITY);
        asyncTaskExecutor.setKeepAliveSeconds(KEEP_ALIVE_SECONDS);
        asyncTaskExecutor.setRejectedExecutionHandler(rejectedExecutionHandler);
        asyncTaskExecutor.setThreadNamePrefix("async-task-thread-pool-");
        asyncTaskExecutor.initialize();
        return asyncTaskExecutor;
    }
}

线程工具类 ThreadPoolUtils.java

import java.util.concurrent.*;

/**
 * 线程 工具类
 */
public class ThreadPoolUtils {

    private static ExecutorService executor = Executors.newCachedThreadPool();

    /**
     * 直接在公共线程池中执行线程
     *
     * @param runnable 可运行对象
     */
    public static void execute(Runnable runnable) {
        try {
            executor.execute(runnable);
    } catch (Exception e) {
            throw new RuntimeException("Exception when running task!", e);
        }
    }

    /**
     * 重启公共线程池
     */
    public static void restart() {
        executor.shutdownNow();
        executor = Executors.newCachedThreadPool();
    }

    /**
     * 新建一个线程池
     *
     * @param threadSize 同时执行的线程数大小
     * @return ExecutorService
     */
    public static ExecutorService newExecutor(int threadSize) {
        return Executors.newFixedThreadPool(threadSize);
    }

    /**
     * 获得一个新的线程池
     *
     * @return ExecutorService
     */
    public static ExecutorService newExecutor() {
        return Executors.newCachedThreadPool();
    }

    /**
     * 获得一个新的线程池,只有单个线程
     *
     * @return ExecutorService
     */
    public static ExecutorService newSingleExecutor() {
        return Executors.newSingleThreadExecutor();
    }

    /**
     * 执行异步方法
     *
     * @param runnable 需要执行的方法体
     * @return 执行的方法体
     */
    public static Runnable excAsync(final Runnable runnable, boolean isDeamon) {
        Thread thread = new Thread() {
            @Override
            public void run() {
                runnable.run();
            }
        };
        thread.setDaemon(isDeamon);
        thread.start();

        return runnable;
    }

    /**
     * 执行有返回值的异步方法<br/>
     * Future代表一个异步执行的操作,通过get()方法可以获得操作的结果,如果异步操作还没有完成,则,get()会使当前线程阻塞
     *
     * @return Future
     */
    public static <T> Future<T> execAsync(Callable<T> task) {
        return executor.submit(task);
    }

    /**
     * 新建一个CompletionService,调用其submit方法可以异步执行多个任务,最后调用take方法按照完成的顺序获得其结果。,若未完成,则会阻塞
     *
     * @return CompletionService
     */
    public static <T> CompletionService<T> newCompletionService() {
        return new ExecutorCompletionService<T>(executor);
    }

    /**
     * 新建一个CompletionService,调用其submit方法可以异步执行多个任务,最后调用take方法按照完成的顺序获得其结果。,若未完成,则会阻塞
     *
     * @return CompletionService
     */
    public static <T> CompletionService<T> newCompletionService(ExecutorService executor) {
        return new ExecutorCompletionService<T>(executor);
    }

    /**
     * 新建一个CountDownLatch
     *
     * @param threadCount 线程数量
     * @return CountDownLatch
     */
    public static CountDownLatch newCountDownLatch(int threadCount) {
        return new CountDownLatch(threadCount);
    }

    /**
     * 挂起当前线程
     *
     * @param millis 挂起的毫秒数
     * @return 被中断返回false,否则true
     */
    public static boolean sleep(Number millis) {
        if (millis == null) {
            return true;
        }

        try {
            Thread.sleep(millis.longValue());
        } catch (InterruptedException e) {
            return false;
        }
        return true;
    }

    /**
     * @return 获得堆栈列表
     */
    public static StackTraceElement[] getStackTrace() {
        return Thread.currentThread().getStackTrace();
    }

    /**
     * 获得堆栈项
     *
     * @param i 第几个堆栈项
     * @return 堆栈项
     */
    public static StackTraceElement getStackTraceElement(int i) {
        StackTraceElement[] stackTrace = getStackTrace();
        if (i < 0) {
            i += stackTrace.length;
        }
        return stackTrace[i];
    }

    /**
     * 创建本地线程对象
     *
     * @return 本地线程
     */
    public static <T> ThreadLocal<T> createThreadLocal(boolean isInheritable) {
        if (isInheritable) {
            return new InheritableThreadLocal<>();
        } else {
            return new ThreadLocal<>();
        }
    }
}

 类似资料: