声明:文章内容是 自己使用后整理,大部分工具代码出自大牛,但因无法确认出处,故仅在此处由衷的对无私分享源代码的作者表示感谢与致敬!
本人在拦截到sql的基础上加了分析功能和异常告警功能
1、导入p6spy的jar包,如果是maven项目引入pom
<dependency>
<groupId>p6spy</groupId>
<artifactId>p6spy</artifactId>
<version>3.9.1</version>
</dependency>
2、修改 datasource数据源 的 driverClassName驱动和 url地址 为 com.p6spy.engine.spy.P6SpyDriver
spring.datasource.driverClassName=com.p6spy.engine.spy.P6SpyDriver
spring.datasource.url=jdbc:p6spy:mysql://${DB_HOST}:3306/${DB_DATABASE}?autoReconnect=true&useUnicode=true&characterEncoding=UTF8&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=CTT&allowMultiQueries=true
3、添加配置文件(springboot项目是放在resource目录下)
spy.properties
module.log=com.p6spy.engine.logging.P6LogFactory,com.p6spy.engine.outage.P6OutageFactory
# 自定义日志打印 自定义P6SpyLogger类的地址
logMessageFormat=com.xxx.xxx.xxx.config.P6SpyLoggerFormatStrategy
# 使用日志系统记录sql (default is com.p6spy.engine.spy.appender.FileLogger)
#appender=com.p6spy.engine.spy.appender.StdoutLogger
#appender=com.p6spy.engine.spy.appender.FileLogger
appender=com.p6spy.engine.spy.appender.Slf4JLogger
## 配置记录Log例外
excludecategories=info,debug,result,batc,resultset
# 设置使用p6spy driver来做代理
deregisterdrivers=true
# 日期格式
dateformat=yyyy-MM-dd HH:mm:ss
# 实际驱动
driverlist=com.mysql.cj.jdbc.Driver
# 是否开启慢SQL记录
outagedetection=true
# 慢SQL记录标准 秒
outagedetectioninterval=2
①、修改driverlist为我们的实际驱动
②、指定日志输出样式logMessageFormat 默认为com.p6spy.engine.spy.appender.SingleLineFormat , 单行输出 不格式化语句。如果不满足,可以仿照其源码,实现MessageFormattingStrategy,自定义日志打印类(如下P6SpyLoggerFormatStrategy.java )
③、可以使用默认的日志系统类:Slf4JLogger、StdoutLogger、FileLogger,如果均不满足,可以仿照其源码,继承 FormattedLogger,自定义类
4、然后自定义一个类实现 MessageFormattingStrategy 接口,自定义日志格式化方式
P6SpyLoggerFormatStrategy .java
public class P6SpyLoggerFormatStrategy implements MessageFormattingStrategy {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
/**
* 日志格式化方式(打印SQL日志会进入此方法,耗时操作,生产环境不建议使用)
*
* @param connectionId: 连接ID
* @param now: 当前时间
* @param elapsed: 花费时间
* @param category: 类别
* @param prepared: 预编译SQL
* @param sql: 最终执行的SQL
* @param url: 数据库连接地址
* @return 格式化日志结果
**/
@Override
public String formatMessage(int connectionId, String now, long elapsed,
String category, String prepared, String sql,
String url) {
if (Func.isEmpty(sql)) {
return "";
}
// 格式化sql
sql = Func.clearExtraSpaces(sql);
// sql的DML
String dml = SqlUtils.getSqlDML(sql);
// 分析sql
String analyzeResult = SqlUtils.analyzeSqlWhere(sql);
// 拼装显示
StringBuilder sb = new StringBuilder().append(formatter.format(LocalDateTime.now())).append(" |");
if (Func.isNotEmpty(analyzeResult)) {
sb.append(analyzeResult).append("|");
}
sb.append(dml).append(" |took ").append(elapsed).append("ms |")
.append(category).append("|").append(connectionId).append("|")
.append(sql).append(";");
// 风险sql会发送告警(邮件、短信、钉钉。。。),目前采用钉钉群告警,异步发送
if (Func.isNotEmpty(analyzeResult)) {
ThreadPoolUtils.execute(new SendNotice(sb.toString(), dml, analyzeResult, MDC.get("traceId")));
}
return sb.toString();
}
/** 内部线程类 */
class SendNotice extends Thread {
private String msg;
private String dml;
private String analyzeResult;
private String traceId;
public SendNotice(String msg, String dml, String analyzeResult,String traceId) {
this.msg = msg;
this.dml = dml;
this.analyzeResult = analyzeResult;
this.traceId = traceId;
}
@Override
public void run() {
// HttpServletRequest request = SpringContextHolder.getBean(HttpServletRequest.class);
// String uri = request.getRequestURI();
// if (!StringUtils.isEmpty(request.getQueryString())) {
// uri = uri + "?" + request.getQueryString();
// }
HashMap<String, Object> parms = new HashMap<>(MapUtils.getSize(7));
parms.put("title", dml);
parms.put("traceId", traceId);
parms.put("message", analyzeResult);
parms.put("body", msg);
EventSendService eventSendService = SpringContextUtil.getBean(EventSendService.class);
eventSendService.sendAlertMsg(parms, "sql");
}
}
}
这样,p6spy就可以记录执行sql并输出了。后续是扩展部分:分析sql和异步发送告警
扩展部分一:分析sql的工具类 SqlUtils.java
@Slf4j
public class SqlUtils {
/**
* 获取aop中的SQL语句
*
* @param pjp
* @param sqlSessionFactory
* @return
* @throws IllegalAccessException
*/
public static String getMybatisSql(ProceedingJoinPoint pjp, SqlSessionFactory sqlSessionFactory) throws IllegalAccessException {
Map<String, Object> map = new HashMap<>(16);
//1.获取namespace+methdoName
MethodSignature signature = (MethodSignature) pjp.getSignature();
Method method = signature.getMethod();
String namespace = method.getDeclaringClass().getName();
String methodName = method.getName();
//2.根据namespace+methdoName获取相对应的MappedStatement
Configuration configuration = sqlSessionFactory.getConfiguration();
MappedStatement mappedStatement = configuration.getMappedStatement(namespace + "." + methodName);
//3.获取方法参数列表名
Parameter[] parameters = method.getParameters();
//4.形参和实参的映射,获取实参
Object[] objects = pjp.getArgs();
Annotation[][] parameterAnnotations = method.getParameterAnnotations();
for (int i = 0; i < parameterAnnotations.length; i++) {
Object object = objects[i];
//说明该参数没有注解,此时该参数可能是实体类,也可能是Map,也可能只是单参数
if (parameterAnnotations[i].length == 0) {
if (object.getClass().getClassLoader() == null && object instanceof Map) {
map.putAll((Map<? extends String, ?>) object);
log.info("该对象为Map");
} else {//形参为自定义实体类
map.putAll(objectToMap(object));
log.info("该对象为用户自定义的对象");
}
} else {//说明该参数有注解,且必须为@Param
for (Annotation annotation : parameterAnnotations[i]) {
if (annotation instanceof Param) {
map.put(((Param) annotation).value(), object);
}
}
}
}
//5.获取boundSql
BoundSql boundSql = mappedStatement.getBoundSql(map);
// BoundSql boundSql = mappedStatement.getBoundSql();
return showSql(configuration, boundSql);
}
/**
* 解析BoundSql,生成不含占位符的SQL语句
*
* @param configuration
* @param boundSql
* @return
*/
private static String showSql(Configuration configuration, BoundSql boundSql) {
Object parameterObject = boundSql.getParameterObject();
List<ParameterMapping> parameterMappings = boundSql.getParameterMappings();
String sql = boundSql.getSql().replaceAll("[\\s]+", " ");
if (parameterMappings.size() > 0 && parameterObject != null) {
TypeHandlerRegistry typeHandlerRegistry = configuration.getTypeHandlerRegistry();
if (typeHandlerRegistry.hasTypeHandler(parameterObject.getClass())) {
sql = sql.replaceFirst("\\?", getParameterValue(parameterObject));
} else {
MetaObject metaObject = configuration.newMetaObject(parameterObject);
for (ParameterMapping parameterMapping : parameterMappings) {
String propertyName = parameterMapping.getProperty();
String[] s = metaObject.getObjectWrapper().getGetterNames();
s.toString();
if (metaObject.hasGetter(propertyName)) {
Object obj = metaObject.getValue(propertyName);
sql = sql.replaceFirst("\\?", getParameterValue(obj));
} else if (boundSql.hasAdditionalParameter(propertyName)) {
Object obj = boundSql.getAdditionalParameter(propertyName);
sql = sql.replaceFirst("\\?", getParameterValue(obj));
}
}
}
}
return sql;
}
/**
* 若为字符串或者日期类型,则在参数两边添加''
*
* @param obj
* @return
*/
private static String getParameterValue(Object obj) {
String value = null;
if (obj instanceof String) {
value = "'" + obj.toString() + "'";
} else if (obj instanceof Date) {
DateFormat formatter = DateFormat.getDateTimeInstance(DateFormat.DEFAULT, DateFormat.DEFAULT, Locale.CHINA);
value = "'" + formatter.format(new Date()) + "'";
} else {
if (obj != null) {
value = obj.toString();
} else {
value = "";
}
}
return value;
}
/**
* 获取利用反射获取类里面的值和名称
*
* @param obj
* @return
* @throws IllegalAccessException
*/
private static Map<String, Object> objectToMap(Object obj) throws IllegalAccessException {
Map<String, Object> map = new HashMap<>(16);
Class<?> clazz = obj.getClass();
log.info("Class<?>={}",clazz);
// 获取本类及其父类的属性,↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
List<Field> fieldList = new ArrayList<>();
while (clazz != null) {
fieldList.addAll(new ArrayList<>(Arrays.asList(clazz.getDeclaredFields())));
clazz = clazz.getSuperclass();
}
// 获取本类及其父类的属性,↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑
for (Field field : fieldList) {
field.setAccessible(true);
String fieldName = field.getName();
Object value = field.get(obj);
map.put(fieldName, value);
}
return map;
}
/**
* 获取 DML 的 方式(insert、delete、update、select)
* @param sql
* @return
*/
public static String getSqlDML(String sql) {
if(Func.isEmpty(sql)){
return null;
}
try {
sql=sql.trim();
int endIndex = sql.indexOf(" ")!=-1?sql.indexOf(" "):sql.length()>=6?6:sql.length();
return sql.substring(0, endIndex).toUpperCase();
} catch (Exception e) {
log.error("SqlUtils- 获取sql的DML 异常",e);
return null;
}
}
/**
* 获取表名
* @param sql
* @return
*/
public static String getTableName(String sql) {
String dml = getSqlDML(sql);
if(Func.isEmpty(dml)){
return null;
}
// LogSqlEnum.DmlEnum.UPDATE.equals(dml) || LogSqlEnum.DmlEnum.DELETE.equals(dml) || LogSqlEnum.DmlEnum.SELECT.equals(dml)
try {
sql = sql.substring(6).trim().toUpperCase();
if((LogSqlConstant.DmlEnum.DELETE.equals(dml) || LogSqlConstant.DmlEnum.SELECT.equals(dml))
&& sql.contains("FROM")){
sql=sql.substring(sql.indexOf("FROM")+4).trim();
String[] s = sql.split(" ");
if(s.length>1){
return s[0].trim();
}
}
if(LogSqlConstant.DmlEnum.UPDATE.equals(dml) && sql.contains("SET")){
return sql.substring(6,sql.indexOf("SET")).trim();
}
return null;
} catch (Exception e) {
log.error("SqlUtils- 获取表名 异常",e);
return null;
}
}
/**
* 分析sql的where条件
* @param sql
* @return
*/
public static String analyzeSqlWhere(String sql) {
try {
String dml = getSqlDML(sql);
// 忽略枚举表
if(LogSqlConstant.DmlEnum.SELECT.equals(dml)
&& LogSqlConstant.getIgnoreTable().contains(getTableName(sql))){
return null;
}
if(LogSqlConstant.DmlEnum.UPDATE.equals(dml) || LogSqlConstant.DmlEnum.DELETE.equals(dml) || LogSqlConstant.DmlEnum.SELECT.equals(dml)){
String sqlUpperCase = sql.toUpperCase();
if(!sqlUpperCase.contains("WHERE")){
return "【高风险sql】"+dml+"无WHERE条件";
}else {
String whereStr = sqlUpperCase.substring(sqlUpperCase.indexOf("WHERE")+5);
if(whereStr.contains("GROUP BY")){
whereStr = whereStr.substring(0,whereStr.indexOf("GROUP BY"));
}else if(whereStr.contains("ORDER BY")){
whereStr = whereStr.substring(0,whereStr.indexOf("ORDER BY"));
}
whereStr = whereStr.trim();
List<String> blList = Lists.newArrayList();
if(whereStr.contains("(")){
String[] bl = whereStr.split("\\(");
for (String b : bl) {
if(Func.isNotEmpty(b)){
blList.add(b);
}
}
}
List<String> brList = Lists.newArrayList();
for (String bl : blList) {
String[] br = bl.split("\\)");
for (String b : br) {
if(Func.isNotEmpty(b)){
brList.add(b);
}
}
}
List<String> andList = Lists.newArrayList();
for (String br : brList) {
String[] ands = br.split("AND");
for (String s : ands) {
if(Func.isNotEmpty(s)){
andList.add(s);
}
}
}
List<String> orList = Lists.newArrayList();
for (String an : andList) {
String[] ors = an.split("OR");
for (String s : ors) {
if(Func.isNotEmpty(s)){
orList.add(s);
}
}
}
StringBuilder nullSB = new StringBuilder();
for (String or : orList) {
String[] eqs = or.split("=");
if(eqs.length==2
&& (Func.isEmpty(eqs[1]) || "null".equalsIgnoreCase(eqs[1].trim()) )){
nullSB.append(or).append(";");
}
}
return nullSB.length()>0?nullSB.insert(0,"【中风险sql】条件值为空:").toString():null;
}
}
return null;
} catch (Exception e) {
log.error("SqlUtils-分析sql的where条件异常",e);
return "SqlUtils-分析sql的where条件异常";
}
}
}
扩展部分二:异步发送告警
配置异步线程池 AsyncConfig.java
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 线程池的配置
*/
@Configuration
public class AsyncConfig {
/** 核心线程数 */
private static final int CORE_POOL_SIZE =10;
/** 最大线程数 */
private static final int MAX_POOL_SIZE = 50;
/** 队列最大长度 >=mainExecutor.maxSize */
private static final int QUEUE_CAPACITY =10;
/** 线程池维护线程所允许的空闲时间 */
private static final int KEEP_ALIVE_SECONDS =20;
/** 线程池对拒绝任务(无线程可用)的处理策略 */
private RejectedExecutionHandler rejectedExecutionHandler =new ThreadPoolExecutor.AbortPolicy();
@Bean("asyncTaskExecutor")
public AsyncTaskExecutor asyncTaskExecutor() {
ThreadPoolTaskExecutor asyncTaskExecutor = new ThreadPoolTaskExecutor();
asyncTaskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
asyncTaskExecutor.setCorePoolSize(CORE_POOL_SIZE);
asyncTaskExecutor.setQueueCapacity(QUEUE_CAPACITY);
asyncTaskExecutor.setKeepAliveSeconds(KEEP_ALIVE_SECONDS);
asyncTaskExecutor.setRejectedExecutionHandler(rejectedExecutionHandler);
asyncTaskExecutor.setThreadNamePrefix("async-task-thread-pool-");
asyncTaskExecutor.initialize();
return asyncTaskExecutor;
}
}
线程工具类 ThreadPoolUtils.java
import java.util.concurrent.*;
/**
* 线程 工具类
*/
public class ThreadPoolUtils {
private static ExecutorService executor = Executors.newCachedThreadPool();
/**
* 直接在公共线程池中执行线程
*
* @param runnable 可运行对象
*/
public static void execute(Runnable runnable) {
try {
executor.execute(runnable);
} catch (Exception e) {
throw new RuntimeException("Exception when running task!", e);
}
}
/**
* 重启公共线程池
*/
public static void restart() {
executor.shutdownNow();
executor = Executors.newCachedThreadPool();
}
/**
* 新建一个线程池
*
* @param threadSize 同时执行的线程数大小
* @return ExecutorService
*/
public static ExecutorService newExecutor(int threadSize) {
return Executors.newFixedThreadPool(threadSize);
}
/**
* 获得一个新的线程池
*
* @return ExecutorService
*/
public static ExecutorService newExecutor() {
return Executors.newCachedThreadPool();
}
/**
* 获得一个新的线程池,只有单个线程
*
* @return ExecutorService
*/
public static ExecutorService newSingleExecutor() {
return Executors.newSingleThreadExecutor();
}
/**
* 执行异步方法
*
* @param runnable 需要执行的方法体
* @return 执行的方法体
*/
public static Runnable excAsync(final Runnable runnable, boolean isDeamon) {
Thread thread = new Thread() {
@Override
public void run() {
runnable.run();
}
};
thread.setDaemon(isDeamon);
thread.start();
return runnable;
}
/**
* 执行有返回值的异步方法<br/>
* Future代表一个异步执行的操作,通过get()方法可以获得操作的结果,如果异步操作还没有完成,则,get()会使当前线程阻塞
*
* @return Future
*/
public static <T> Future<T> execAsync(Callable<T> task) {
return executor.submit(task);
}
/**
* 新建一个CompletionService,调用其submit方法可以异步执行多个任务,最后调用take方法按照完成的顺序获得其结果。,若未完成,则会阻塞
*
* @return CompletionService
*/
public static <T> CompletionService<T> newCompletionService() {
return new ExecutorCompletionService<T>(executor);
}
/**
* 新建一个CompletionService,调用其submit方法可以异步执行多个任务,最后调用take方法按照完成的顺序获得其结果。,若未完成,则会阻塞
*
* @return CompletionService
*/
public static <T> CompletionService<T> newCompletionService(ExecutorService executor) {
return new ExecutorCompletionService<T>(executor);
}
/**
* 新建一个CountDownLatch
*
* @param threadCount 线程数量
* @return CountDownLatch
*/
public static CountDownLatch newCountDownLatch(int threadCount) {
return new CountDownLatch(threadCount);
}
/**
* 挂起当前线程
*
* @param millis 挂起的毫秒数
* @return 被中断返回false,否则true
*/
public static boolean sleep(Number millis) {
if (millis == null) {
return true;
}
try {
Thread.sleep(millis.longValue());
} catch (InterruptedException e) {
return false;
}
return true;
}
/**
* @return 获得堆栈列表
*/
public static StackTraceElement[] getStackTrace() {
return Thread.currentThread().getStackTrace();
}
/**
* 获得堆栈项
*
* @param i 第几个堆栈项
* @return 堆栈项
*/
public static StackTraceElement getStackTraceElement(int i) {
StackTraceElement[] stackTrace = getStackTrace();
if (i < 0) {
i += stackTrace.length;
}
return stackTrace[i];
}
/**
* 创建本地线程对象
*
* @return 本地线程
*/
public static <T> ThreadLocal<T> createThreadLocal(boolean isInheritable) {
if (isInheritable) {
return new InheritableThreadLocal<>();
} else {
return new ThreadLocal<>();
}
}
}