当前位置: 首页 > 工具软件 > lettuce-core > 使用案例 >

聊聊skywalking的lettuce-plugin

封烈
2023-12-01

skywalking-plugin.def
skywalking-6.6.0/apm-sniffer/optional-plugins/lettuce-5.x-plugin/src/main/resources/skywalking-plugin.def

lettuce-5.x=org.apache.skywalking.apm.plugin.lettuce.v5.define.AbstractRedisClientInstrumentation
lettuce-5.x=org.apache.skywalking.apm.plugin.lettuce.v5.define.AsyncCommandInstrumentation
lettuce-5.x=org.apache.skywalking.apm.plugin.lettuce.v5.define.ClientOptionsInstrumentation
lettuce-5.x=org.apache.skywalking.apm.plugin.lettuce.v5.define.RedisChannelWriterInstrumentation
lettuce-5.x=org.apache.skywalking.apm.plugin.lettuce.v5.define.RedisClientInstrumentation
lettuce-5.x=org.apache.skywalking.apm.plugin.lettuce.v5.define.RedisClusterClientInstrumentation
skywalking的lettuce-plugin提供了AbstractRedisClientInstrumentation、AsyncCommandInstrumentation、ClientOptionsInstrumentation、RedisChannelWriterInstrumentation、RedisClientInstrumentation、RedisClusterClientInstrumentation增强
AbstractRedisClientInstrumentation
skywalking-6.6.0/apm-sniffer/optional-plugins/lettuce-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/lettuce/v5/define/AbstractRedisClientInstrumentation.java

public class AbstractRedisClientInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {

private static final String ENHANCE_CLASS = "io.lettuce.core.AbstractRedisClient";

private static final String ABSTRACT_REDIS_CLIENT_CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.lettuce.v5.AbstractRedisClientInterceptor";

@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
    return new ConstructorInterceptPoint[0];
}

@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
    return new InstanceMethodsInterceptPoint[] {
        new InstanceMethodsInterceptPoint() {
            @Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
                return named("setOptions").and(takesArgumentWithType(0, "io.lettuce.core.ClientOptions"));
            }

            @Override public String getMethodsInterceptor() {
                return ABSTRACT_REDIS_CLIENT_CONSTRUCTOR_INTERCEPTOR_CLASS;
            }

            @Override public boolean isOverrideArgs() {
                return false;
            }
        }
    };
}

@Override
public ClassMatch enhanceClass() {
    return byName(ENHANCE_CLASS);
}

}
AbstractRedisClientInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.lettuce.v5.AbstractRedisClientInterceptor增强了io.lettuce.core.AbstractRedisClient名为setOptions的方法
AbstractRedisClientInterceptor
skywalking-6.6.0/apm-sniffer/optional-plugins/lettuce-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/lettuce/v5/AbstractRedisClientInterceptor.java

public class AbstractRedisClientInterceptor implements InstanceMethodsAroundInterceptor {

@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                         MethodInterceptResult result) throws Throwable {
    EnhancedInstance clientOptions = (EnhancedInstance) allArguments[0];
    if (clientOptions == null) {
        return;
    }
    AbstractRedisClient client = (AbstractRedisClient) objInst;
    if (client.getOptions() == null || ((EnhancedInstance) client.getOptions()).getSkyWalkingDynamicField() == null) {
        return;
    }
    clientOptions.setSkyWalkingDynamicField(((EnhancedInstance) client.getOptions()).getSkyWalkingDynamicField());
}

@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
                          Class<?>[] argumentsTypes, Object ret) throws Throwable {
    return ret;
}

@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
                                  Class<?>[] argumentsTypes, Throwable t) {
}

}
AbstractRedisClientInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法给allArguments[0]设置了AbstractRedisClient的options的skyWalkingDynamicField
AsyncCommandInstrumentation
skywalking-6.6.0/apm-sniffer/optional-plugins/lettuce-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/lettuce/v5/define/AsyncCommandInstrumentation.java

public class AsyncCommandInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {

private static final String ENHANCE_CLASS = "io.lettuce.core.protocol.AsyncCommand";

private static final String ASYNC_COMMAND_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.plugin.lettuce.v5.AsyncCommandMethodInterceptor";

@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
    return new ConstructorInterceptPoint[0];
}

@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
    return new InstanceMethodsInterceptPoint[]{
        new InstanceMethodsInterceptPoint() {
            @Override
            public ElementMatcher<MethodDescription> getMethodsMatcher() {
                return (named("onComplete").and(takesArgumentWithType(0,"java.util.function.Consumer")))
                        .or(named("onComplete").and(takesArgumentWithType(0,"java.util.function.BiConsumer")));
            }

            @Override
            public String getMethodsInterceptor() {
                return ASYNC_COMMAND_METHOD_INTERCEPTOR;
            }

            @Override
            public boolean isOverrideArgs() {
                return true;
            }
        }
    };
}

@Override
public ClassMatch enhanceClass() {
    return byName(ENHANCE_CLASS);
}

}
AsyncCommandInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.lettuce.v5.AsyncCommandMethodInterceptor增强了io.lettuce.core.protocol.AsyncCommand的onComplete方法
AsyncCommandMethodInterceptor
skywalking-6.6.0/apm-sniffer/optional-plugins/lettuce-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/lettuce/v5/AsyncCommandMethodInterceptor.java

public class AsyncCommandMethodInterceptor implements InstanceMethodsAroundInterceptor {

@Override
@SuppressWarnings("unchecked")
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                         MethodInterceptResult result) throws Throwable {
    AsyncCommand asyncCommand = (AsyncCommand) objInst;
    String operationName = "Lettuce/" + asyncCommand.getType().name();
    AbstractSpan span = ContextManager.createLocalSpan(operationName + "/onComplete");
    span.setComponent(ComponentsDefine.LETTUCE);
    if (allArguments[0] instanceof Consumer) {
        allArguments[0] = new SWConsumer((Consumer) allArguments[0], ContextManager.capture(), operationName);
    } else {
        allArguments[0] = new SWBiConsumer((BiConsumer) allArguments[0], ContextManager.capture(), operationName);
    }
}

@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
                          Class<?>[] argumentsTypes, Object ret) throws Throwable {
    ContextManager.stopSpan();
    return ret;
}

@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
                                  Class<?>[] argumentsTypes, Throwable t) {
    ContextManager.activeSpan().errorOccurred().log(t);
}

}
AsyncCommandMethodInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法使用SWConsumer或者SWBiConsumer包装了allArguments[0];其afterMethod方法执行ContextManager.stopSpan();其handleMethodException方法执行ContextManager.activeSpan().errorOccurred().log(t)
ClientOptionsInstrumentation
skywalking-6.6.0/apm-sniffer/optional-plugins/lettuce-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/lettuce/v5/define/ClientOptionsInstrumentation.java

public class ClientOptionsInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {

private static final String ENHANCE_CLASS = "io.lettuce.core.ClientOptions";

private static final String CLIENT_OPTIONS_CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.lettuce.v5.ClientOptionsConstructorInterceptor";

@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
    return new ConstructorInterceptPoint[]{
        new ConstructorInterceptPoint() {
            @Override
            public ElementMatcher<MethodDescription> getConstructorMatcher() {
                return any();
            }

            @Override
            public String getConstructorInterceptor() {
                return CLIENT_OPTIONS_CONSTRUCTOR_INTERCEPTOR_CLASS;
            }
        }
    };
}

@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
    return new InstanceMethodsInterceptPoint[0];
}

@Override
public ClassMatch enhanceClass() {
    return byName(ENHANCE_CLASS);
}

}
ClientOptionsInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.lettuce.v5.ClientOptionsConstructorInterceptor增强了io.lettuce.core.ClientOptions的构造器
ClientOptionsConstructorInterceptor
skywalking-6.6.0/apm-sniffer/optional-plugins/lettuce-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/lettuce/v5/ClientOptionsConstructorInterceptor.java

public class ClientOptionsConstructorInterceptor implements InstanceConstructorInterceptor {

@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
}

}
ClientOptionsConstructorInterceptor实现了InstanceConstructorInterceptor接口,其onConstruct方法目前还没有操作
RedisChannelWriterInstrumentation
skywalking-6.6.0/apm-sniffer/optional-plugins/lettuce-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/lettuce/v5/define/RedisChannelWriterInstrumentation.java

public class RedisChannelWriterInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {

private static final String ENHANCE_CLASS = "io.lettuce.core.protocol.DefaultEndpoint";

private static final String REDIS_CHANNEL_WRITER_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.lettuce.v5.RedisChannelWriterInterceptor";

@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
    return new ConstructorInterceptPoint[] {
        new ConstructorInterceptPoint() {
            @Override
            public ElementMatcher<MethodDescription> getConstructorMatcher() {
                return takesArgumentWithType(0, "io.lettuce.core.ClientOptions");
            }

            @Override
            public String getConstructorInterceptor() {
                return REDIS_CHANNEL_WRITER_INTERCEPTOR_CLASS;
            }
        }
    };
}

@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
    return new InstanceMethodsInterceptPoint[]{
        new InstanceMethodsInterceptPoint() {
            @Override
            public ElementMatcher<MethodDescription> getMethodsMatcher() {
                return named("writeToChannelAndFlush").or(named("writeAndFlush"));
            }

            @Override
            public String getMethodsInterceptor() {
                return REDIS_CHANNEL_WRITER_INTERCEPTOR_CLASS;
            }

            @Override
            public boolean isOverrideArgs() {
                return false;
            }
        }
    };
}

@Override
public ClassMatch enhanceClass() {
    return byName(ENHANCE_CLASS);
}

}
RedisChannelWriterInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine类,它增强的是io.lettuce.core.protocol.DefaultEndpoint类;它使用org.apache.skywalking.apm.plugin.lettuce.v5.RedisChannelWriterInterceptor增强了其参数类型为io.lettuce.core.ClientOptions的构造器,还增强了其writeToChannelAndFlush、writeAndFlush方法
RedisChannelWriterInterceptor
skywalking-6.6.0/apm-sniffer/optional-plugins/lettuce-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/lettuce/v5/RedisChannelWriterInterceptor.java

public class RedisChannelWriterInterceptor implements InstanceMethodsAroundInterceptor, InstanceConstructorInterceptor {

@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                         MethodInterceptResult result) throws Throwable {
    String peer = (String) objInst.getSkyWalkingDynamicField();

    StringBuilder dbStatement = new StringBuilder();
    String operationName = "Lettuce/";

    if (allArguments[0] instanceof RedisCommand) {
        RedisCommand redisCommand = (RedisCommand) allArguments[0];
        String command = redisCommand.getType().name();
        operationName = operationName + command;
        dbStatement.append(command);
    } else if (allArguments[0] instanceof Collection) {
        @SuppressWarnings("unchecked")
        Collection<RedisCommand> redisCommands = (Collection<RedisCommand>) allArguments[0];
        operationName = operationName + "BATCH_WRITE";
        for (RedisCommand redisCommand : redisCommands) {
            dbStatement.append(redisCommand.getType().name()).append(";");
        }
    }

    AbstractSpan span = ContextManager.createExitSpan(operationName, peer);
    span.setComponent(ComponentsDefine.LETTUCE);
    Tags.DB_TYPE.set(span, "Redis");
    Tags.DB_STATEMENT.set(span, dbStatement.toString());
    SpanLayer.asCache(span);
}

@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
                          Class<?>[] argumentsTypes, Object ret) throws Throwable {
    ContextManager.stopSpan();
    return ret;
}

@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
                                  Class<?>[] argumentsTypes, Throwable t) {
    AbstractSpan span = ContextManager.activeSpan();
    span.errorOccurred();
    span.log(t);
}

@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
    EnhancedInstance optionsInst = (EnhancedInstance) allArguments[0];
    objInst.setSkyWalkingDynamicField(optionsInst.getSkyWalkingDynamicField());
}

}
RedisChannelWriterInterceptor实现了InstanceMethodsAroundInterceptor, InstanceConstructorInterceptor接口;其beforeMethod方法设置DB_TYPE、DB_STATEMENT;其afterMethod方法执行ContextManager.stopSpan();其handleMethodException方法执行span.errorOccurred()及span.log(t);其onConstruct方法将allArguments[0]的skyWalkingDynamicField传递给objInst
RedisClientInstrumentation
skywalking-6.6.0/apm-sniffer/optional-plugins/lettuce-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/lettuce/v5/define/RedisClientInstrumentation.java

public class RedisClientInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {

private static final String ENHANCE_CLASS = "io.lettuce.core.RedisClient";

private static final String REDIS_CLIENT_CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.lettuce.v5.RedisClientConstructorInterceptor";

@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
    return new ConstructorInterceptPoint[]{
        new ConstructorInterceptPoint() {
            @Override
            public ElementMatcher<MethodDescription> getConstructorMatcher() {
                return takesArgumentWithType(1, "io.lettuce.core.RedisURI");
            }

            @Override
            public String getConstructorInterceptor() {
                return REDIS_CLIENT_CONSTRUCTOR_INTERCEPTOR_CLASS;
            }
        }
    };
}

@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
    return new InstanceMethodsInterceptPoint[0];
}

@Override
public ClassMatch enhanceClass() {
    return byName(ENHANCE_CLASS);
}

}
RedisClientInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.lettuce.v5.RedisClientConstructorInterceptor增强了io.lettuce.core.RedisClient的第二个参数类型为io.lettuce.core.RedisURI的构造器
RedisClientConstructorInterceptor
skywalking-6.6.0/apm-sniffer/optional-plugins/lettuce-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/lettuce/v5/RedisClientConstructorInterceptor.java

public class RedisClientConstructorInterceptor implements InstanceConstructorInterceptor {

@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
    RedisURI redisURI = (RedisURI) allArguments[1];
    RedisClient redisClient = (RedisClient) objInst;
    EnhancedInstance optionsInst = (EnhancedInstance) redisClient.getOptions();
    optionsInst.setSkyWalkingDynamicField(redisURI.getHost() + ":" + redisURI.getPort());
}

}
RedisClientConstructorInterceptor实现了InstanceConstructorInterceptor接口,其onConstruct方法获取redisURI信息设置给redisClient.getOptions()
RedisClusterClientInstrumentation
skywalking-6.6.0/apm-sniffer/optional-plugins/lettuce-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/lettuce/v5/define/RedisClusterClientInstrumentation.java

public class RedisClusterClientInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {

private static final String ENHANCE_CLASS = "io.lettuce.core.cluster.RedisClusterClient";

private static final String REDIS_CLUSTER_CLIENT_CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.lettuce.v5.RedisClusterClientConstructorInterceptor";

@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
    return new ConstructorInterceptPoint[]{
        new ConstructorInterceptPoint() {
            @Override
            public ElementMatcher<MethodDescription> getConstructorMatcher() {
                return takesArgumentWithType(1, "java.lang.Iterable");
            }

            @Override
            public String getConstructorInterceptor() {
                return REDIS_CLUSTER_CLIENT_CONSTRUCTOR_INTERCEPTOR_CLASS;
            }
        }
    };
}

@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
    return new InstanceMethodsInterceptPoint[0];
}

@Override
public ClassMatch enhanceClass() {
    return byName(ENHANCE_CLASS);
}

}
RedisClusterClientInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.lettuce.v5.RedisClusterClientConstructorInterceptor增强了io.lettuce.core.cluster.RedisClusterClient的第二个参数类型为java.lang.Iterable的构造器
RedisClusterClientConstructorInterceptor
skywalking-6.6.0/apm-sniffer/optional-plugins/lettuce-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/lettuce/v5/RedisClusterClientConstructorInterceptor.java

public class RedisClusterClientConstructorInterceptor implements InstanceConstructorInterceptor {

@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
    @SuppressWarnings("unchecked")
    Iterable<RedisURI> redisURIs = (Iterable<RedisURI>) allArguments[1];
    RedisClusterClient redisClusterClient = (RedisClusterClient) objInst;
    StringBuilder peer = new StringBuilder();
    for (RedisURI redisURI : redisURIs) {
        peer.append(redisURI.getHost()).append(":").append(redisURI.getPort()).append(";");
    }
    EnhancedInstance optionsInst = (EnhancedInstance) redisClusterClient.getOptions();
    optionsInst.setSkyWalkingDynamicField(PeerFormat.shorten(peer.toString()));
}

}
RedisClusterClientConstructorInterceptor实现了InstanceConstructorInterceptor接口,其onConstruct方法获取peer信息,然后设置给redisClusterClient.getOptions()
小结
skywalking的lettuce-plugin提供了AbstractRedisClientInstrumentation、AsyncCommandInstrumentation、ClientOptionsInstrumentation、RedisChannelWriterInstrumentation、RedisClientInstrumentation、RedisClusterClientInstrumentation增强

 类似资料: