最近支持一下 ONS 内部skywalking 增强支持,RocketMQ 开源版本支持skywalking ,阿里云上的skywalking不支持 简单的实现一下,了解整个实现的逻辑。
https://skywalking.apache.org/zh/2019-01-21-agent-plugin-practice/
非官方文档 很详细
https://www.jianshu.com/p/e5fb4d46c618
https://blog.csdn.net/kaiyuanshe/article/details/109685249
在客户端,创建一个新的 traceId 所有信息放到HTTP heads、Dubbo attachments 或者Kafka messages。
通过服务调用,traceId 传递。
skywalking 中使用 ContextCarrier 传递 信息
比如: 这里通过 Mq 用户自定义属性传递 skywalking 中的携带信息
Properties userProperties = message.getUserProperties();
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
if (!StringUtil.isEmpty(next.getHeadValue())) {
userProperties.setProperty(next.getHeadKey(), next.getHeadValue());
}
}
听名字 就有意思 上下文快照,直接粘贴官方的文档 。
Besides cross-process tracing, cross-thread tracing has to be supported as well. For instance, both async process (in-memory MQ) and batch process are common in Java. Cross-process and cross-thread tracing are very similar in that they both require propagating context, except that cross-thread tracing does not require serialization.
Here are the three steps on cross-thread propagation:
字节码增强使用 bytebuddy
可以先去官方的Java agent 代码下载下来湫湫一下子. 很多的例子
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-agent-core</artifactId>
<version>${sky-agent-version}</version>
<scope>provided</scope>
</dependency>
ClassInstanceMethodsEnhancePluginDefine �
比如增强这个类的实例 com.aliyun.openservices.ons.api.impl.rocketmq.ConsumerImpl
package com.aliyun.openservices.ons.api;
public interface Consumer extends Admin {
void subscribe(String var1, String var2, MessageListener var3);
void subscribe(String var1, MessageSelector var2, MessageListener var3);
void unsubscribe(String var1);
}
注意这里为什么不能直接增强 MessageListener? 如下写法为lambda 表达式 skywalking不支持
https://blog.csdn.net/weixin_39850981/article/details/118846538 bytebuddy 支持官方没有实现
所以改为覆盖参数进行增强
consumer.subscribe(topic, tag, (msg, context) -> {
})
NameMatch.byName(ENHANCE_CLASS);
ProducerConstructorInterceptor & InstanceMethodsAroundInterceptor 信息订阅
如下为 阿里云ONS 消费者的定义增强
public class ConsumerImplInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS = "com.aliyun.openservices.ons.api.impl.rocketmq.ConsumerImpl";
private static final String CONSUMER_MESSAGE_METHOD = "subscribe";
private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.ons.ConsumerImplInterceptor";
public static final String CONSTRUCTOR_INTERCEPT_TYPE = "java.util.Properties";
public static final String CONSUMER_CONFIG_CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.ons.ConsumerConstructorInterceptor";
@Override
protected ClassMatch enhanceClass() {
return NameMatch.byName(ENHANCE_CLASS);
}
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[]{
new ConstructorInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getConstructorMatcher() {
return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPT_TYPE);
}
@Override
public String getConstructorInterceptor() {
return CONSUMER_CONFIG_CONSTRUCTOR_INTERCEPTOR_CLASS;
}
}
};
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[]{
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(CONSUMER_MESSAGE_METHOD)
.and(takesArgumentWithType(1, "java.lang.String"))
.and(takesArgumentWithType(2, "com.aliyun.openservices.ons.api.MessageListener"))
;
}
@Override
public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS;
}
@Override
public boolean isOverrideArgs() {
return true;
}
}
};
}
}
/**
* {@link com.aliyun.openservices.ons.api.impl.rocketmq.ProducerImpl}
*
* @author wangji
* @date 2022-04-12 09:46
*/
public class ProducerConstructorInterceptor implements InstanceConstructorInterceptor {
private static final ILog LOGGER = LogManager.getLogger(ProducerConstructorInterceptor.class);
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) throws Throwable {
try {
Properties properties = (Properties) allArguments[0];
ConfigProducerPropertiesCache cache = new ConfigProducerPropertiesCache();
cache.setNameServer(properties.getProperty(PropertyKeyConst.NAMESRV_ADDR));
cache.setGroupId(properties.getProperty(PropertyKeyConst.GROUP_ID));
cache.setmQType(properties.getProperty(PropertyKeyConst.MQType));
objInst.setSkyWalkingDynamicField(cache);
} catch (Exception e) {
LOGGER.error("ProducerImpl {}",e);
}
}
}
consumer.subscribe(topic, tag, (msg, context) -> {
})
这里为什么进行替换参数是由于内部写法为lambda表达式 so 采用替换参数,debug 看日志的时候发现一直增强不行.
/**
* {@link com.aliyun.openservices.ons.api.impl.rocketmq.ConsumerImpl}
*
* @author wangji
* @date 2022-04-09 18:03
*/
public class ConsumerImplInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
ConfigConsumerPropertiesCache configConsumerPropertiesCache = (ConfigConsumerPropertiesCache) objInst.getSkyWalkingDynamicField();
Object messageListener = allArguments[2];
if (null != messageListener) {
if (messageListener instanceof EnhancedInstance) {
} else if (messageListener instanceof MessageListener) {
MessageListenerCache messageListenerCache = new MessageListenerCache();
messageListenerCache.setMessageListener((MessageListener) messageListener);
messageListenerCache.setConsumerPropertiesCache(configConsumerPropertiesCache);
allArguments[2] = new MessageListenerAdapterInterceptor(messageListenerCache);
}
}
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
return ret;
}
@Override
public void handleMethodException(EnhancedInstance enhancedInstance, Method method, Object[] objects, Class<?>[] classes, Throwable throwable) {
}
}
从userMessage 中获取 上下文信息,设置 ContextCarrier 信息 创建 Span
/**
* 消息监听处理..
*
* @author wangji
* @date 2022-04-11 18:42
*/
public class MessageListenerAdapterInterceptor implements MessageListener, EnhancedInstance {
public static final String CONSUMER_OPERATION_NAME_PREFIX = "ALiYunOns/";
private MessageListenerCache cache;
public MessageListenerAdapterInterceptor(MessageListenerCache cache) {
this.cache = cache;
}
@Override
public Action consume(Message message, ConsumeContext context) {
ContextCarrier contextCarrier = getContextCarrierFromMessage(message);
ConfigConsumerPropertiesCache consumerPropertiesCache = cache.getConsumerPropertiesCache();
AbstractSpan span = ContextManager.createEntrySpan(CONSUMER_OPERATION_NAME_PREFIX + message
.getTopic() + "/Consumer", contextCarrier);
span.setComponent(ComponentsDefine.ROCKET_MQ_CONSUMER);
span.setPeer(consumerPropertiesCache.getNameServer());
SpanLayer.asMQ(span);
ContextManager.extract(getContextCarrierFromMessage(message));
StringTag groupIdTag = (StringTag) Tags.ofKey(PropertyKeyConst.GROUP_ID);
groupIdTag.set(span,consumerPropertiesCache.getGroupId());
StringTag messageModel = (StringTag) Tags.ofKey(PropertyKeyConst.MessageModel);
messageModel.set(span,consumerPropertiesCache.getMessageModel());
StringTag msgIdTag = (StringTag) Tags.ofKey(Message.SystemPropKey.MSGID);
msgIdTag.set(span,message.getMsgID());
StringTag tagTag = (StringTag) Tags.ofKey(Message.SystemPropKey.TAG);
tagTag.set(span,message.getTag());
if(message.getKey() !=null && message.getKey().length()>0){
StringTag keyTag = (StringTag) Tags.ofKey(Message.SystemPropKey.KEY);
keyTag.set(span,message.getKey());
}
try {
Action consume = cache.getMessageListener().consume(message, context);
if (consume != null) {
AbstractSpan activeSpan = ContextManager.activeSpan();
Tags.MQ_STATUS.set(activeSpan, consume.name());
if (consume != Action.CommitMessage) {
activeSpan.errorOccurred();
}
}
return consume;
} catch (Throwable t) {
ContextManager.activeSpan().log(t);
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else {
throw new RuntimeException(t);
}
} finally {
ContextManager.stopSpan();
}
}
@Override
public Object getSkyWalkingDynamicField() {
return cache;
}
@Override
public void setSkyWalkingDynamicField(Object o) {
}
private ContextCarrier getContextCarrierFromMessage(Message message) {
ContextCarrier contextCarrier = new ContextCarrier();
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
next.setHeadValue(message.getUserProperties(next.getHeadKey()));
}
return contextCarrier;
}
}
src/main/resources/skywalking-plugin.def
# Key=value的形式
# key随便写;value是Instrumentation类的包名类名全路径
aliyun-ons-plugin=org.apache.skywalking.apm.plugin.ons.define.ProducerInstrumentation
aliyun-ons-plugin=org.apache.skywalking.apm.plugin.ons.define.ConsumerImplInstrumentation
因为 skywalking里面增对 bytebuddy 进行了shade 所以也需要处理一下,然后扔进plugin 里面就可以了
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.gongdao</groupId>
<artifactId>apm-aliyun-mq-ons-plugin</artifactId>
<version>2022-04-15-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<sky-agent-version>8.9.0</sky-agent-version>
<bytebuddy.version>1.11.18</bytebuddy.version>
<shade.package>org.apache.skywalking.apm.dependencies</shade.package>
<shade.net.bytebuddy.source>net.bytebuddy</shade.net.bytebuddy.source>
<shade.net.bytebuddy.target>${shade.package}.${shade.net.bytebuddy.source}</shade.net.bytebuddy.target>
</properties>
<dependencies>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.7.1.Final</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-agent-core</artifactId>
<version>${sky-agent-version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy</artifactId>
<version>${bytebuddy.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>java-agent-util</artifactId>
<version>${sky-agent-version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-test-tools</artifactId>
<version>${sky-agent-version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<createDependencyReducedPom>true</createDependencyReducedPom>
<createSourcesJar>true</createSourcesJar>
<shadeSourcesContent>true</shadeSourcesContent>
<relocations>
<relocation>
<pattern>${shade.net.bytebuddy.source}</pattern>
<shadedPattern>${shade.net.bytebuddy.target}</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>3.0.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
-DSW_AGENT_COLLECTOR_BACKEND_SERVICES=127.0.0.1:11800 可以不用配置 agent config 里面可以处理
刚开始开发 logging.level=${SW_LOGGING_LEVEL:INFO} 可以修改日志的等级为debug 查看 agent的日志
-javaagent:/Users/wangji/agent/skywalking-agent.jar
-Dskywalking.agent.service_name=test-provider
-agentlib:jdwp=transport=dt_socket,address=5005,server=y,suspend=n
-javaagent:/Users/wangji/agent/skywalking-agent.jar
-Dskywalking.agent.service_name=test-provider
提供了 针对 java.lang.Runnable & java.util.concurrent.Callable 的子类 ,注意 lambda 表达式不支持,提供前缀配置类的前缀,如果有一个统一的 https://skywalking.apache.org/docs/skywalking-java/v8.10.0/en/setup/service-agent/java-agent/configurations/
plugin.jdkthreading.threading_class_prefixes | Threading classes (java.lang.Runnable and java.util.concurrent.Callable) and their subclasses, including anonymous inner classes whose name match any one of the THREADING_CLASS_PREFIXES (splitted by ,) will be instrumented, make sure to only specify as narrow prefixes as what you’re expecting to instrument, (java. and javax. will be ignored due to safety issues) | SW_PLUGIN_JDKTHREADING_THREADING_CLASS_PREFIXES |
---|
https://skywalking.apache.org/docs/skywalking-java/v8.10.0/en/setup/service-agent/java-agent/application-toolkit-trace-cross-thread/
注意这里必须插入线程之前构造当前类,这个实现原理通过监听构造函数
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.submit(CallableWrapper.of(new Callable<String>() {
@Override public String call() throws Exception {
return null;
}
}));
public class CallableOrRunnableConstructInterceptor implements InstanceConstructorInterceptor {
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
if (ContextManager.isActive()) {
// 用于传递参数
objInst.setSkyWalkingDynamicField(ContextManager.capture());
}
}
}
public class CallableOrRunnableInvokeInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
ContextManager.createLocalSpan("Thread/" + objInst.getClass().getName() + "/" + method.getName());
// 获取快照
ContextSnapshot cachedObjects = (ContextSnapshot) objInst.getSkyWalkingDynamicField();
if (cachedObjects != null) {
ContextManager.continued(cachedObjects);
}
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
ContextManager.stopSpan();
// clear ContextSnapshot
objInst.setSkyWalkingDynamicField(null);
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().log(t);
}
}
缓存起来代码可以查看到增强后的逻辑哦~
# If true, SkyWalking agent will cache all instrumented classes files to memory or disk files (decided by class cache mode),
# allow other javaagent to enhance those classes that enhanced by SkyWalking agent.
agent.is_cache_enhanced_class=${SW_AGENT_CACHE_CLASS:true}
# The instrumented classes cache mode: MEMORY or FILE
# MEMORY: cache class bytes to memory, if instrumented classes is too many or too large, it may take up more memory
# FILE: cache class bytes in `/class-cache` folder, automatically clean up cached class files when the application exits
agent.class_cache_mode=${SW_AGENT_CLASS_CACHE_MODE:FILE}
# If set to true, the parameters of the sql (typically `java.sql.PreparedStatement`) would be collected.
plugin.jdbc.trace_sql_parameters=${SW_JDBC_TRACE_SQL_PARAMETERS:true}
# Apache Dubbo consumer collect `arguments` in RPC call, use `Object#toString` to collect `arguments`.
plugin.dubbo.collect_consumer_arguments=${SW_PLUGIN_DUBBO_COLLECT_CONSUMER_ARGUMENTS:true}
# Apache Dubbo provider collect `arguments` in RPC call, use `Object#toString` to collect `arguments`.
plugin.dubbo.collect_provider_arguments=${SW_PLUGIN_DUBBO_COLLECT_PROVIDER_ARGUMENTS:true}