当前位置: 首页 > 工具软件 > more plugin > 使用案例 >

skywalking plugin 开发初探 ONS plugin 实践

萧明贤
2023-12-01

最近支持一下 ONS 内部skywalking 增强支持,RocketMQ 开源版本支持skywalking ,阿里云上的skywalking不支持 简单的实现一下,了解整个实现的逻辑。

一、开发指南

参考文档

官方开发指南
https://skywalking.apache.org/docs/skywalking-java/v8.10.0/en/setup/service-agent/java-agent/java-plugin-development-guide/

https://skywalking.apache.org/zh/2019-01-21-agent-plugin-practice/
非官方文档 很详细
https://www.jianshu.com/p/e5fb4d46c618
https://blog.csdn.net/kaiyuanshe/article/details/109685249

1.1 核心参数传递 ContextCarrier

在客户端,创建一个新的 traceId 所有信息放到HTTP heads、Dubbo attachments 或者Kafka messages。
通过服务调用,traceId 传递。
skywalking 中使用 ContextCarrier 传递 信息
比如: 这里通过 Mq 用户自定义属性传递 skywalking 中的携带信息

Properties userProperties = message.getUserProperties();
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
    next = next.next();
    if (!StringUtil.isEmpty(next.getHeadValue())) {
        userProperties.setProperty(next.getHeadKey(), next.getHeadValue());
    }
}

1.1.1 跨线程传递 ContextSnapshot

听名字 就有意思 上下文快照,直接粘贴官方的文档 。
Besides cross-process tracing, cross-thread tracing has to be supported as well. For instance, both async process (in-memory MQ) and batch process are common in Java. Cross-process and cross-thread tracing are very similar in that they both require propagating context, except that cross-thread tracing does not require serialization.
Here are the three steps on cross-thread propagation:

  1. Use ContextManager#capture to get the ContextSnapshot object.
  2. Let the sub-thread access the ContextSnapshot through method arguments or being carried by existing arguments
  3. Use ContextManager#continued in sub-thread.

2. 字节码增强

https://skywalking.apache.org/docs/skywalking-java/v8.10.0/en/setup/service-agent/java-agent/java-plugin-development-guide/

字节码增强使用 bytebuddy

可以先去官方的Java agent 代码下载下来湫湫一下子. 很多的例子

 <dependency>
    <groupId>org.apache.skywalking</groupId>
    <artifactId>apm-agent-core</artifactId>
    <version>${sky-agent-version}</version>
    <scope>provided</scope>
</dependency>

ClassInstanceMethodsEnhancePluginDefine �
比如增强这个类的实例 com.aliyun.openservices.ons.api.impl.rocketmq.ConsumerImpl


package com.aliyun.openservices.ons.api;

public interface Consumer extends Admin {
    void subscribe(String var1, String var2, MessageListener var3);

    void subscribe(String var1, MessageSelector var2, MessageListener var3);

    void unsubscribe(String var1);
}

注意这里为什么不能直接增强 MessageListener? 如下写法为lambda 表达式 skywalking不支持
https://blog.csdn.net/weixin_39850981/article/details/118846538 bytebuddy 支持官方没有实现
所以改为覆盖参数进行增强
consumer.subscribe(topic, tag, (msg, context) -> {
})

2.1 定义你对哪个class 增强?

NameMatch.byName(ENHANCE_CLASS);

2.2 定义你对哪个方法增强 &构造函数?

ProducerConstructorInterceptor & InstanceMethodsAroundInterceptor 信息订阅
如下为 阿里云ONS 消费者的定义增强

public class ConsumerImplInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {

    private static final String ENHANCE_CLASS = "com.aliyun.openservices.ons.api.impl.rocketmq.ConsumerImpl";
    private static final String CONSUMER_MESSAGE_METHOD = "subscribe";
    private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.ons.ConsumerImplInterceptor";

    public static final String CONSTRUCTOR_INTERCEPT_TYPE = "java.util.Properties";

    public static final String CONSUMER_CONFIG_CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.ons.ConsumerConstructorInterceptor";

    @Override
    protected ClassMatch enhanceClass() {
        return NameMatch.byName(ENHANCE_CLASS);
    }

    @Override
    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
        return new ConstructorInterceptPoint[]{
                new ConstructorInterceptPoint() {
                    @Override
                    public ElementMatcher<MethodDescription> getConstructorMatcher() {
                        return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPT_TYPE);
                    }

                    @Override
                    public String getConstructorInterceptor() {
                        return CONSUMER_CONFIG_CONSTRUCTOR_INTERCEPTOR_CLASS;
                    }
                }

        };
    }

    @Override
    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
        return new InstanceMethodsInterceptPoint[]{
                new InstanceMethodsInterceptPoint() {
                    @Override
                    public ElementMatcher<MethodDescription> getMethodsMatcher() {
                        return named(CONSUMER_MESSAGE_METHOD)
                                .and(takesArgumentWithType(1, "java.lang.String"))
                                .and(takesArgumentWithType(2, "com.aliyun.openservices.ons.api.MessageListener"))
                                ;
                    }

                    @Override
                    public String getMethodsInterceptor() {
                        return INTERCEPTOR_CLASS;
                    }

                    @Override
                    public boolean isOverrideArgs() {
                        return true;
                    }
                }
        };
    }
}

2.3 定义增强的处理逻辑

2.3.1 消费者构造函数处理获取参数
/**
 * {@link  com.aliyun.openservices.ons.api.impl.rocketmq.ProducerImpl}
 *
 * @author wangji
 * @date 2022-04-12 09:46
 */
public class ProducerConstructorInterceptor implements InstanceConstructorInterceptor {
    private static final ILog LOGGER = LogManager.getLogger(ProducerConstructorInterceptor.class);
    @Override
    public void onConstruct(EnhancedInstance objInst, Object[] allArguments) throws Throwable {
        try {
            Properties properties = (Properties) allArguments[0];
            ConfigProducerPropertiesCache cache = new ConfigProducerPropertiesCache();
            cache.setNameServer(properties.getProperty(PropertyKeyConst.NAMESRV_ADDR));
            cache.setGroupId(properties.getProperty(PropertyKeyConst.GROUP_ID));
            cache.setmQType(properties.getProperty(PropertyKeyConst.MQType));
            objInst.setSkyWalkingDynamicField(cache);
        } catch (Exception e) {
            LOGGER.error("ProducerImpl {}",e);
        }
    }
}
2.3.2 消费者实现处理进行参数替换

consumer.subscribe(topic, tag, (msg, context) -> {
})
这里为什么进行替换参数是由于内部写法为lambda表达式 so 采用替换参数,debug 看日志的时候发现一直增强不行.

/**
 * {@link  com.aliyun.openservices.ons.api.impl.rocketmq.ConsumerImpl}
 *
 * @author wangji
 * @date 2022-04-09 18:03
 */
public class ConsumerImplInterceptor implements InstanceMethodsAroundInterceptor {

    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
        ConfigConsumerPropertiesCache configConsumerPropertiesCache = (ConfigConsumerPropertiesCache) objInst.getSkyWalkingDynamicField();
        Object messageListener = allArguments[2];
        if (null != messageListener) {
            if (messageListener instanceof EnhancedInstance) {

            } else if (messageListener instanceof MessageListener) {
                MessageListenerCache messageListenerCache = new MessageListenerCache();
                messageListenerCache.setMessageListener((MessageListener) messageListener);

                messageListenerCache.setConsumerPropertiesCache(configConsumerPropertiesCache);
                allArguments[2] = new MessageListenerAdapterInterceptor(messageListenerCache);
            }
        }
    }

    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                              Object ret) throws Throwable {
        return ret;
    }

    @Override
    public void handleMethodException(EnhancedInstance enhancedInstance, Method method, Object[] objects, Class<?>[] classes, Throwable throwable) {
    }

}
2.3.3 消费者Listener 监听逻辑

从userMessage 中获取 上下文信息,设置 ContextCarrier 信息 创建 Span

/**
 * 消息监听处理..
 *
 * @author wangji
 * @date 2022-04-11 18:42
 */
public class MessageListenerAdapterInterceptor implements MessageListener, EnhancedInstance {

    public static final String CONSUMER_OPERATION_NAME_PREFIX = "ALiYunOns/";

    private MessageListenerCache cache;

    public MessageListenerAdapterInterceptor(MessageListenerCache cache) {
        this.cache = cache;
    }

    @Override
    public Action consume(Message message, ConsumeContext context) {
        ContextCarrier contextCarrier = getContextCarrierFromMessage(message);
        ConfigConsumerPropertiesCache consumerPropertiesCache = cache.getConsumerPropertiesCache();
        AbstractSpan span = ContextManager.createEntrySpan(CONSUMER_OPERATION_NAME_PREFIX + message
                .getTopic() + "/Consumer", contextCarrier);

        span.setComponent(ComponentsDefine.ROCKET_MQ_CONSUMER);
        span.setPeer(consumerPropertiesCache.getNameServer());
        SpanLayer.asMQ(span);
        ContextManager.extract(getContextCarrierFromMessage(message));

        StringTag groupIdTag = (StringTag) Tags.ofKey(PropertyKeyConst.GROUP_ID);
        groupIdTag.set(span,consumerPropertiesCache.getGroupId());

        StringTag messageModel = (StringTag) Tags.ofKey(PropertyKeyConst.MessageModel);
        messageModel.set(span,consumerPropertiesCache.getMessageModel());

        StringTag msgIdTag = (StringTag) Tags.ofKey(Message.SystemPropKey.MSGID);
        msgIdTag.set(span,message.getMsgID());

        StringTag tagTag = (StringTag) Tags.ofKey(Message.SystemPropKey.TAG);
        tagTag.set(span,message.getTag());
        if(message.getKey() !=null && message.getKey().length()>0){
            StringTag keyTag = (StringTag) Tags.ofKey(Message.SystemPropKey.KEY);
            keyTag.set(span,message.getKey());
        }

        try {
            Action consume = cache.getMessageListener().consume(message, context);
            if (consume != null) {
                AbstractSpan activeSpan = ContextManager.activeSpan();
                Tags.MQ_STATUS.set(activeSpan, consume.name());
                if (consume != Action.CommitMessage) {
                    activeSpan.errorOccurred();
                }
            }
            return consume;
        } catch (Throwable t) {
            ContextManager.activeSpan().log(t);
            if (t instanceof RuntimeException) {
                throw (RuntimeException) t;
            } else {
                throw new RuntimeException(t);
            }
        } finally {
            ContextManager.stopSpan();
        }

    }

    @Override
    public Object getSkyWalkingDynamicField() {
        return cache;
    }

    @Override
    public void setSkyWalkingDynamicField(Object o) {

    }

    private ContextCarrier getContextCarrierFromMessage(Message message) {
        ContextCarrier contextCarrier = new ContextCarrier();

        CarrierItem next = contextCarrier.items();
        while (next.hasNext()) {
            next = next.next();
            next.setHeadValue(message.getUserProperties(next.getHeadKey()));
        }

        return contextCarrier;
    }
}

3、打包

https://blog.csdn.net/kaiyuanshe/article/details/109685249

3.1 定义增强的配置

src/main/resources/skywalking-plugin.def

# Key=value的形式
# key随便写;value是Instrumentation类的包名类名全路径
aliyun-ons-plugin=org.apache.skywalking.apm.plugin.ons.define.ProducerInstrumentation
aliyun-ons-plugin=org.apache.skywalking.apm.plugin.ons.define.ConsumerImplInstrumentation

3.2 maven shade 定义

因为 skywalking里面增对 bytebuddy 进行了shade 所以也需要处理一下,然后扔进plugin 里面就可以了

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.gongdao</groupId>
    <artifactId>apm-aliyun-mq-ons-plugin</artifactId>
    <version>2022-04-15-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <sky-agent-version>8.9.0</sky-agent-version>
        <bytebuddy.version>1.11.18</bytebuddy.version>
        <shade.package>org.apache.skywalking.apm.dependencies</shade.package>
        <shade.net.bytebuddy.source>net.bytebuddy</shade.net.bytebuddy.source>
        <shade.net.bytebuddy.target>${shade.package}.${shade.net.bytebuddy.source}</shade.net.bytebuddy.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-client</artifactId>
            <version>1.8.7.1.Final</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.skywalking</groupId>
            <artifactId>apm-agent-core</artifactId>
            <version>${sky-agent-version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>net.bytebuddy</groupId>
            <artifactId>byte-buddy</artifactId>
            <version>${bytebuddy.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.skywalking</groupId>
            <artifactId>java-agent-util</artifactId>
            <version>${sky-agent-version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.skywalking</groupId>
            <artifactId>apm-test-tools</artifactId>
            <version>${sky-agent-version}</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <shadedArtifactAttached>false</shadedArtifactAttached>
                            <createDependencyReducedPom>true</createDependencyReducedPom>
                            <createSourcesJar>true</createSourcesJar>
                            <shadeSourcesContent>true</shadeSourcesContent>
                            <relocations>
                                <relocation>
                                    <pattern>${shade.net.bytebuddy.source}</pattern>
                                    <shadedPattern>${shade.net.bytebuddy.target}</shadedPattern>
                                </relocation>
                            </relocations>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-source-plugin</artifactId>
                <version>3.0.1</version>
                <executions>
                    <execution>
                        <id>attach-sources</id>
                        <goals>
                            <goal>jar</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

4、测试debug

启动应用程序 agent 调试都类似

-DSW_AGENT_COLLECTOR_BACKEND_SERVICES=127.0.0.1:11800 可以不用配置 agent config 里面可以处理
刚开始开发 logging.level=${SW_LOGGING_LEVEL:INFO} 可以修改日志的等级为debug 查看 agent的日志

  • idea 通过run的方式启动remote debug ,非debug 模式.然后在插件的工程通过remote debug 链接上。
-javaagent:/Users/wangji/agent/skywalking-agent.jar
-Dskywalking.agent.service_name=test-provider
-agentlib:jdwp=transport=dt_socket,address=5005,server=y,suspend=n
  • idea 通过debug的方式启动,可以将 plugin 包通过add lib 的方式 获取到代码然后进行debug,这样既可以debug 工程代码 也可以debug agent代码。
-javaagent:/Users/wangji/agent/skywalking-agent.jar
-Dskywalking.agent.service_name=test-provider

二、skywalking 跨线程能力

apm-jdk-threading-plugin 插件

提供了 针对 java.lang.Runnable & java.util.concurrent.Callable 的子类 ,注意 lambda 表达式不支持,提供前缀配置类的前缀,如果有一个统一的 https://skywalking.apache.org/docs/skywalking-java/v8.10.0/en/setup/service-agent/java-agent/configurations/

plugin.jdkthreading.threading_class_prefixesThreading classes (java.lang.Runnable and java.util.concurrent.Callable) and their subclasses, including anonymous inner classes whose name match any one of the THREADING_CLASS_PREFIXES (splitted by ,) will be instrumented, make sure to only specify as narrow prefixes as what you’re expecting to instrument, (java. and javax. will be ignored due to safety issues)SW_PLUGIN_JDKTHREADING_THREADING_CLASS_PREFIXES

使用注解 @TraceCrossThread or apm-toolkit-trace 里面的包装类

https://skywalking.apache.org/docs/skywalking-java/v8.10.0/en/setup/service-agent/java-agent/application-toolkit-trace-cross-thread/
注意这里必须插入线程之前构造当前类,这个实现原理通过监听构造函数

  ExecutorService executorService = Executors.newFixedThreadPool(1);
    executorService.submit(CallableWrapper.of(new Callable<String>() {
        @Override public String call() throws Exception {
            return null;
        }
    }));

实现原理

构造先抓取
public class CallableOrRunnableConstructInterceptor implements InstanceConstructorInterceptor {
    @Override
    public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
        if (ContextManager.isActive()) {
             // 用于传递参数
            objInst.setSkyWalkingDynamicField(ContextManager.capture());
        }
}

}
执行方法前注入快照
public class CallableOrRunnableInvokeInterceptor implements InstanceMethodsAroundInterceptor {
    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
        MethodInterceptResult result) throws Throwable {
        ContextManager.createLocalSpan("Thread/" + objInst.getClass().getName() + "/" + method.getName());
        // 获取快照
        ContextSnapshot cachedObjects = (ContextSnapshot) objInst.getSkyWalkingDynamicField();
        if (cachedObjects != null) {
            ContextManager.continued(cachedObjects);
        }
    }

    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
        Object ret) throws Throwable {
        ContextManager.stopSpan();
        // clear ContextSnapshot
        objInst.setSkyWalkingDynamicField(null);
        return ret;
    }

    @Override
    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
        Class<?>[] argumentsTypes, Throwable t) {
        ContextManager.activeSpan().log(t);
    }
}

三、插件常用的启动参数

3.1 skywalking 支持arthas 缓存起来 默认false

缓存起来代码可以查看到增强后的逻辑哦~

# If true, SkyWalking agent will cache all instrumented classes files to memory or disk files (decided by class cache mode),
# allow other javaagent to enhance those classes that enhanced by SkyWalking agent.
agent.is_cache_enhanced_class=${SW_AGENT_CACHE_CLASS:true}

# The instrumented classes cache mode: MEMORY or FILE
# MEMORY: cache class bytes to memory, if instrumented classes is too many or too large, it may take up more memory
# FILE: cache class bytes in `/class-cache` folder, automatically clean up cached class files when the application exits
agent.class_cache_mode=${SW_AGENT_CLASS_CACHE_MODE:FILE}

3.2 jdbc 增强是否打印参数 默认false


# If set to true, the parameters of the sql (typically `java.sql.PreparedStatement`) would be collected.
plugin.jdbc.trace_sql_parameters=${SW_JDBC_TRACE_SQL_PARAMETERS:true}

3.3 是否打印dubbo的参数 测试环境可以打开

#  Apache Dubbo consumer collect `arguments` in RPC call, use `Object#toString` to collect `arguments`. 
plugin.dubbo.collect_consumer_arguments=${SW_PLUGIN_DUBBO_COLLECT_CONSUMER_ARGUMENTS:true}


#  Apache Dubbo provider collect `arguments` in RPC call, use `Object#toString` to collect `arguments`. 
plugin.dubbo.collect_provider_arguments=${SW_PLUGIN_DUBBO_COLLECT_PROVIDER_ARGUMENTS:true}
 类似资料: