我有两个通过Kafka交互的微服务,一个发布消息,另一个消费消息。发布者和消费者都运行在Quarkus(1.12.0.final)上,并使用反应消息和兵变。
package myproducer;
import myavro.MyAvro;
import io.smallrye.mutiny.Uni;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Message;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import java.util.concurrent.CompletableFuture;
@ApplicationScoped
public class Publisher {
@Channel("mytopic")
@Inject
public Emitter<MyAvro> myTopic;
@Override
public Uni<Void> publish(MyModel model) {
MyAvro avro = MyModelMapper.INSTANCE.modelToAvro(model);
return Uni.createFrom().emitter(e -> myTopic.send(Message.of(avro)
.addMetadata(toOutgoingKafkaRecordMetadata(avro))
.withAck(() -> {
e.complete(null);
return CompletableFuture.completedFuture(null);
})));
}
}
消费者:
package myconsumer;
import myavro.MyAvro;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class Consumer {
@Incoming("mytopic")
public Uni<Void> consume(IncomingKafkaRecord<String, MyAvro> message) {
MyModel model = MyModelMapper.INSTANCE.avroToModel(message.getPayload());
return ...;
}
}
依赖关系:除其他外,包括人工制品
夸克配置(Application.Properties):包括
quarkus.jaeger.service-name=myservice
quarkus.jaeger.sampler-type=const
quarkus.jaeger.sampler-param=1
quarkus.log.console.format=%d{yyyy-MM-dd HH:mm:ss} %-5p traceId=%X{traceId}, spanId=%X{spanId}, sampled=%X{sampled} [%c{2.}] (%t) %s%e%n
mp.messaging.incoming.mytopic.topic=abc
mp.messaging.incoming.mytopic.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.mytopic.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
...
mp.messaging.incoming.mytopic.interceptor.classes=io.opentracing.contrib.kafka.TracingConsumerInterceptor
在这个设置下,根本不会记录traceId或spanId(尽管根据Quarkus的“使用opentracing”指南,它们应该记录)。只有在添加@org.eclipse.microprofile.opentracing.traced之后,才会设置traceId和spanId,但两者在生产者和消费者上是完全不相关的。
我对照前面提到的Quarkus指南“使用opentracing”检查了我的opentracing配置,但没有发现我这边配置错误的提示。在阅读了关于一些夸克扩展中的问题的讨论后,我在与Mutiny一起使用时依赖于ThreadLocals,在我的依赖项中添加了夸克-Smallrye-Context-Propagation,但没有用。
有什么想法吗?
这个问题不容易解决,首先我会试着解释一下发生了什么。
OpenTracing具有事务和跨度的概念。跨度是一个执行块(方法、数据库调用、对Kafka主题的发送),而事务是一个跨多个组件的分布式进程(一组跨度)。
这里的问题是,每次创建一个span时,它没有找到任何OpenTracing事务,所以它创建了一个新事务。这就是为什么你的跨度没有相互关联的原因。
在OpenTracing中,当您创建span时,您将基于span上下文创建它。每个OpenTracing集成都将创建一个基于扩展技术的span上下文(我没有找到更好的术语),例如,HTTP span上下文基于HTTP头,Kafka span上下文基于Kafka头。
因此,要关联两个跨,您需要使用底层技术提供正确的OpenTracing ID的某个上下文创建跨上下文。
例如,要关联两个Kafka跨度,您需要有一个uber-trace-id
头(这是Jaeger中OpenTracing id的默认名称)和跟踪标识符(有关此头的格式,请参见tracespan-identity)。
知道了这一点,就有多重的事情要做。
首先,您需要在@trated
方法中的传出消息中添加uber-trace-id
Kafka头,以便将来自该方法的跨度与在Kafka producer拦截器中创建的跨度关联起来。
Tracer tracer = GlobalTracer.get(); // you can also inject it
JaegerSpanContext spanCtx = ((JaegerSpan)tracer.activeSpan()).context();
// uber-trace-id format: {trace-id}:{span-id}:{parent-span-id}:{flags}
//see https://www.jaegertracing.io/docs/1.21/client-libraries/#tracespan-identity
var uberTraceId = spanCtx.getTraceId() + ":" +
Long.toHexString(spanCtx.getSpanId()) + ":" +
Long.toHexString(spanCtx.getParentId()) + ":" +
Integer.toHexString(spanCtx.getFlags());
headers.add("uber-trace-id", openTracingId.getBytes());
然后,您需要将@trated
方法与传入消息的跨度(如果有的话)相关联。为此,最简单的方法是添加一个CDI拦截器,该拦截器将根据方法参数(它将搜索一个@trated
参数),尝试为所有用@trated
注释的方法创建一个span上下文。要使其工作,需要在OpenTracing拦截器之前执行此拦截器,并在拦截器上下文中设置span上下文。
这是我们的拦截器实现,请随意使用它或根据您的需要调整它。
public class KafkaRecordOpenTracingInterceptor {
@AroundInvoke
public Object propagateSpanCtx(InvocationContext ctx) throws Exception {
for (int i = 0 ; i < ctx.getParameters().length ; i++) {
Object parameter = ctx.getParameters()[i];
if (parameter instanceof Message) {
Message message = (Message) parameter;
Headers headers = message.getMetadata(IncomingKafkaRecordMetadata.class)
.map(IncomingKafkaRecordMetadata::getHeaders)
.get();
SpanContext spanContext = getSpanContext(headers);
ctx.getContextData().put(OpenTracingInterceptor.SPAN_CONTEXT, spanContext);
}
}
return ctx.proceed();
}
private SpanContext getSpanContext(Headers headers) {
return TracingKafkaUtils.extractSpanContext(headers, GlobalTracer.get());
}
}
该代码同时使用Quarkus OpenTracing extension和Kafka OpenTracing contrib库。
由于添加了从当前span上下文创建的OpenTracing Kafka头而使传出消息相互关联,并且从传入消息的头创建上下文,因此在任何情况下都应该发生这种关联。
顺便说一下,在使用/而不是发射器时,我遇到了同样的问题。我已经决定给你们举这个例子,因为它很容易理解和再现。
我正在使用JavaMail应用编程接口来获取一些电子邮件。我想得到一个消息流,然后在另一边得到一个电子邮件流。此外,我不想失去任何属性,如附件、目的地、发送者、正文等... 如何才能做到这一点?
运行jaeger服务器和https://github.com/quarkusio/quarkus-quickstarts/tree/master/opentracing-quickstart存储库后,我在http://localhost:16686/search.找到了跟踪,但我只找到了资源类、参数和进程名称,但跟踪详细展开中没有显示“日志”。 步骤很简单: 1、运行jaeger server<代
我得到这个错误时,调用请求权限()方法的Firebase消息。 [错误:flatter/lib/ui/ui_dart_state.cc(209)]未处理的异常:MissingPluginException(未找到方法消息传递的实现#通道插件上的requestPermission.flatter.io/firebase_消息传递)E/flatter(7180):#0 convertPlatformE
由于内容脚本在网页而不是扩展程序的上下文中运行,因此它们通常需要某种与扩展程序其余部分进行通信的方式。例如,RSS 阅读器扩展程序可以使用内容脚本来检测页面上 RSS 摘要的存在,然后通知后台页面以显示该页面的操作图标。 扩展及其内容脚本之间的通信使用消息传递来实现。任何一方都可以监听从另一端发送的消息,并在同一通道上进行响应。消息可以包含任何有效的 JSON 对象(空,布尔值,数字,字符串,数组
ms tcp nodelay 描述: 在信差的 TCP 会话上禁用 nagle 算法。 类型: Boolean 是否必需: No 默认值: true ms initial backoff 描述: 出错时重连的初始等待时间。 类型: Double 是否必需: No 默认值: .2 ms max backoff 描述: 出错重连时等待的最大时间。 类型: Double 是否必需: No 默认值: 15