当前位置: 首页 > 知识库问答 >
问题:

当使用反应消息传递时,获取未在夸克上传播的上下文

宗政楚
2023-03-14

我有两个通过Kafka交互的微服务,一个发布消息,另一个消费消息。发布者和消费者都运行在Quarkus(1.12.0.final)上,并使用反应消息和兵变。

package myproducer;

import myavro.MyAvro;
import io.smallrye.mutiny.Uni;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Message;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import java.util.concurrent.CompletableFuture;


@ApplicationScoped
public class Publisher {  
  @Channel("mytopic")
  @Inject
  public Emitter<MyAvro> myTopic;

  @Override
  public Uni<Void> publish(MyModel model) {
    MyAvro avro = MyModelMapper.INSTANCE.modelToAvro(model);

    return Uni.createFrom().emitter(e -> myTopic.send(Message.of(avro)
                                                .addMetadata(toOutgoingKafkaRecordMetadata(avro))
                                                .withAck(() -> {
                                                     e.complete(null);
                                                     return CompletableFuture.completedFuture(null);
                                                })));
  }
}

消费者:

package myconsumer;

import myavro.MyAvro;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class Consumer {

  @Incoming("mytopic")
  public Uni<Void> consume(IncomingKafkaRecord<String, MyAvro> message) {
    MyModel model = MyModelMapper.INSTANCE.avroToModel(message.getPayload());

    return ...;
  }

}

依赖关系:除其他外,包括人工制品

  • Quarkus-斯莫尔-反应消息-Kafka
  • Quarkus-雷斯特塞西-兵变
  • Quarkus-Smallrye-Opentracing
  • Quarkus-兵变
  • opentracing-kafka-client

夸克配置(Application.Properties):包括

quarkus.jaeger.service-name=myservice
quarkus.jaeger.sampler-type=const
quarkus.jaeger.sampler-param=1
quarkus.log.console.format=%d{yyyy-MM-dd HH:mm:ss} %-5p traceId=%X{traceId}, spanId=%X{spanId}, sampled=%X{sampled} [%c{2.}] (%t) %s%e%n

mp.messaging.incoming.mytopic.topic=abc
mp.messaging.incoming.mytopic.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.mytopic.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
...
mp.messaging.incoming.mytopic.interceptor.classes=io.opentracing.contrib.kafka.TracingConsumerInterceptor

在这个设置下,根本不会记录traceId或spanId(尽管根据Quarkus的“使用opentracing”指南,它们应该记录)。只有在添加@org.eclipse.microprofile.opentracing.traced之后,才会设置traceId和spanId,但两者在生产者和消费者上是完全不相关的。

我对照前面提到的Quarkus指南“使用opentracing”检查了我的opentracing配置,但没有发现我这边配置错误的提示。在阅读了关于一些夸克扩展中的问题的讨论后,我在与Mutiny一起使用时依赖于ThreadLocals,在我的依赖项中添加了夸克-Smallrye-Context-Propagation,但没有用。

有什么想法吗?

共有1个答案

解柏
2023-03-14

这个问题不容易解决,首先我会试着解释一下发生了什么。

OpenTracing具有事务和跨度的概念。跨度是一个执行块(方法、数据库调用、对Kafka主题的发送),而事务是一个跨多个组件的分布式进程(一组跨度)。

这里的问题是,每次创建一个span时,它没有找到任何OpenTracing事务,所以它创建了一个新事务。这就是为什么你的跨度没有相互关联的原因。

在OpenTracing中,当您创建span时,您将基于span上下文创建它。每个OpenTracing集成都将创建一个基于扩展技术的span上下文(我没有找到更好的术语),例如,HTTP span上下文基于HTTP头,Kafka span上下文基于Kafka头。

因此,要关联两个跨,您需要使用底层技术提供正确的OpenTracing ID的某个上下文创建跨上下文。

例如,要关联两个Kafka跨度,您需要有一个uber-trace-id头(这是Jaeger中OpenTracing id的默认名称)和跟踪标识符(有关此头的格式,请参见tracespan-identity)。

知道了这一点,就有多重的事情要做。

首先,您需要在@trated方法中的传出消息中添加uber-trace-idKafka头,以便将来自该方法的跨度与在Kafka producer拦截器中创建的跨度关联起来。

Tracer tracer = GlobalTracer.get(); // you can also inject it
JaegerSpanContext spanCtx = ((JaegerSpan)tracer.activeSpan()).context();
// uber-trace-id format: {trace-id}:{span-id}:{parent-span-id}:{flags}
//see https://www.jaegertracing.io/docs/1.21/client-libraries/#tracespan-identity
var uberTraceId = spanCtx.getTraceId() + ":" +
        Long.toHexString(spanCtx.getSpanId()) + ":" +
        Long.toHexString(spanCtx.getParentId()) + ":" +
        Integer.toHexString(spanCtx.getFlags());
headers.add("uber-trace-id", openTracingId.getBytes());

然后,您需要将@trated方法与传入消息的跨度(如果有的话)相关联。为此,最简单的方法是添加一个CDI拦截器,该拦截器将根据方法参数(它将搜索一个@trated参数),尝试为所有用@trated注释的方法创建一个span上下文。要使其工作,需要在OpenTracing拦截器之前执行此拦截器,并在拦截器上下文中设置span上下文。

这是我们的拦截器实现,请随意使用它或根据您的需要调整它。

public class KafkaRecordOpenTracingInterceptor {

    @AroundInvoke
    public Object propagateSpanCtx(InvocationContext ctx) throws Exception {
        for (int i = 0 ; i < ctx.getParameters().length ; i++) {
            Object parameter = ctx.getParameters()[i];

            if (parameter instanceof Message) {
                Message message = (Message) parameter;

                Headers headers = message.getMetadata(IncomingKafkaRecordMetadata.class)
                    .map(IncomingKafkaRecordMetadata::getHeaders)
                    .get();
                SpanContext spanContext = getSpanContext(headers);
                ctx.getContextData().put(OpenTracingInterceptor.SPAN_CONTEXT, spanContext);
            }
        }

        return ctx.proceed();
    }

    private SpanContext getSpanContext(Headers headers) {
        return TracingKafkaUtils.extractSpanContext(headers, GlobalTracer.get());
    }
}

该代码同时使用Quarkus OpenTracing extension和Kafka OpenTracing contrib库。

由于添加了从当前span上下文创建的OpenTracing Kafka头而使传出消息相互关联,并且从传入消息的头创建上下文,因此在任何情况下都应该发生这种关联。

 类似资料:
  • 顺便说一下,在使用/而不是发射器时,我遇到了同样的问题。我已经决定给你们举这个例子,因为它很容易理解和再现。

  • 我正在使用JavaMail应用编程接口来获取一些电子邮件。我想得到一个消息流,然后在另一边得到一个电子邮件流。此外,我不想失去任何属性,如附件、目的地、发送者、正文等... 如何才能做到这一点?

  • 运行jaeger服务器和https://github.com/quarkusio/quarkus-quickstarts/tree/master/opentracing-quickstart存储库后,我在http://localhost:16686/search.找到了跟踪,但我只找到了资源类、参数和进程名称,但跟踪详细展开中没有显示“日志”。 步骤很简单: 1、运行jaeger server<代

  • 我得到这个错误时,调用请求权限()方法的Firebase消息。 [错误:flatter/lib/ui/ui_dart_state.cc(209)]未处理的异常:MissingPluginException(未找到方法消息传递的实现#通道插件上的requestPermission.flatter.io/firebase_消息传递)E/flatter(7180):#0 convertPlatformE

  • 由于内容脚本在网页而不是扩展程序的上下文中运行,因此它们通常需要某种与扩展程序其余部分进行通信的方式。例如,RSS 阅读器扩展程序可以使用内容脚本来检测页面上 RSS 摘要的存在,然后通知后台页面以显示该页面的操作图标。 扩展及其内容脚本之间的通信使用消息传递来实现。任何一方都可以监听从另一端发送的消息,并在同一通道上进行响应。消息可以包含任何有效的 JSON 对象(空,布尔值,数字,字符串,数组

  • ms tcp nodelay 描述: 在信差的 TCP 会话上禁用 nagle 算法。 类型: Boolean 是否必需: No 默认值: true ms initial backoff 描述: 出错时重连的初始等待时间。 类型: Double 是否必需: No 默认值: .2 ms max backoff 描述: 出错重连时等待的最大时间。 类型: Double 是否必需: No 默认值: 15