当前位置: 首页 > 知识库问答 >
问题:

配置以在KafkaMessageListenerContainer不在侦听器上启动新跨度(Kafka Sleuth)

杜经艺
2023-03-14

我是Kafka侦探的新手。目前我遇到了一个问题,span在MessageListenerMethodInterceptor中创建了一个在KafkaListener上触发的拦截器,但如果出现异常,我们将在ErrorHandler中丢失spanId和traceId。是否可以将sleuth配置为在KafkaMessageListenerContainer中启动span?

共有2个答案

段干玺
2023-03-14

我已经处理好了。在我的本地pc中,它工作正常,但我有一些疑问。。。

>

  • 我通过在application.properties中设置spring.sleuth.messaging.kafka.enabled=false来禁用MessageListenermetodInterceptor

    我创建了一个记录拦截器,实现了org。springframework。Kafka。侦听器。RecordInterceptor如下所示:

     private final Tracer tracer;
     private final KafkaTracing kafkaTracing;
    
     private static final ThreadLocal<Span> CURRENT_SPAN = new ThreadLocal<>();
     private static final ThreadLocal<Tracer.SpanInScope> CURRENT_SPAN_IN_SCOPE = new ThreadLocal<>();
    
     @Override
     public ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record, Consumer<K, V> consumer) {
         openSpan(record);
         return RecordInterceptor.super.intercept(record, consumer);
     }
    
     @Override
     public void failure(ConsumerRecord<K, V> record, Exception exception, Consumer<K, V> consumer) {
         Span span = CURRENT_SPAN.get();
         if (span != null) {
             String message = Optional.ofNullable(exception.getMessage()).orElseGet(() -> exception.getClass().getSimpleName());
             span.tag("error", message);
         }
     }
    
     @Override
     public void clearThreadState(Consumer<?, ?> consumer) {
         closeSpan();
     }
    
     private void openSpan(ConsumerRecord<K, V> record) {
         Span span = kafkaTracing.nextSpan(record).name("kafka-tracing").start();
         Tracer.SpanInScope spanInScope = tracer.withSpanInScope(span);
         CURRENT_SPAN.set(span);
         CURRENT_SPAN_IN_SCOPE.set(spanInScope);
         if (LOG.isDebugEnabled()) {
             LOG.debug("created span {}", span);
         }
     }
    
     private void closeSpan() {
         Span span = CURRENT_SPAN.get();
         if (span != null) {
             try {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("finishing span {}", span);
                 }
                 span.finish();
             } catch (Exception ex) {
                 LOG.warn("on finishing Span {} got error {}", span, ex.getMessage());
             } finally {
                 CURRENT_SPAN.set(null);
             }
         }
    
         Tracer.SpanInScope spanInScope = CURRENT_SPAN_IN_SCOPE.get();
         if (spanInScope != null) {
             try {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("closing SpanInScope {}", spanInScope);
                 }
                 spanInScope.close();
             } catch (Exception ex) {
                 LOG.warn("on closing SpanInScope {} got error {}", spanInScope, ex.getMessage());
             } finally {
                 CURRENT_SPAN_IN_SCOPE.set(null);
             }
         }
     }
    

    其他方法为空。

    此拦截器的bean创建如下:

        @Bean
        public KafkaTracingRecordInterceptor<Object, Object> kafkaRecordInterceptor(Tracer tracer, Tracing tracing) {
            return new KafkaTracingRecordInterceptor<>(tracer, KafkaTracing.create(tracing));
        }
    

    附言:如果我遗漏了什么或以错误的方式实现了它,请添加评论。

  • 司业
    2023-03-14

    记录拦截器是仪器的最佳位置;最近的改进扩展了它的范围,包括错误处理程序。

    https://github.com/spring-projects/spring-kafka/pull/1946

     类似资料:
    • 我有一个关于正确配置kafka侦听器属性的问题-侦听器和advertised.listers。 在我的配置中,我设置了以下道具: 客户端使用 进行连接。我是否需要在侦听器和广告侦听器中具有相同的值。这里 是指向运行 kafka 代理的主机的 dns 记录。 在什么情况下,我希望它们保持不变和不同? 谢谢!

    • 我正在Wildfly 9.0.1的KeyClope中配置一个事件侦听器。我创造了一个。jar有两个类,实现了一个提供者,如KeyClope在他的github示例中解释的那样。 在本例中,keydape人员解释说,有必要注册提供者编辑“standalone/configuration/standalone.xml”并将模块添加到providers元素中。我在标签“subsystem”中对这个定义进行

    • 在Spring应用程序启动时,我想在Redis中查找一个值,并根据该值关闭或保留消息侦听器。 完全不初始化这些bean也是可以的,但是我也找不到方法。 目前,我正在尝试使用Spring的关闭容器: 容器 消息侦听器: 问题是,如果我用已经在队列中的消息启动应用程序,消息侦听器将在执行之前拾取消息。 有办法实现我的目标吗?即使采用不同的方法

    • 要运行Kafka,需要在文件。有两种设置我不理解。 有人可以解释侦听器和广告侦听器属性之间的区别吗? 留档说: 侦听器:套接字服务器侦听的地址。 和 advertised.listeners:主机名和端口代理将向生产者和消费者做广告。 我什么时候必须使用哪个设置?

    • 我正在使用Spring靴和活动MQ设置一个持久的JMS主题使用者。我能够使用Spring靴@JmsListener注释来使一切正常工作(作为耐用消费者成功运行)。但是,因为我想动态创建侦听器,所以我尝试使用 JmsListener 配置接口来创建它们。 使用主题“消费者”下面的代码可以成功地创建和使用消息。但是,问题是它创造的消费者并不持久。我在工厂中将clientId、setSubscripti

    • 我刚下载了这个项目: 但当我尝试使用推荐的命令运行服务器时: 我得到以下输出: 如你所见,有一行是这样写的: