当前位置: 首页 > 知识库问答 >
问题:

当Spring Sleuth位于类路径中时,为什么跟踪信息不在kafka消息上传播?

弓俊晖
2023-03-14

由于未触发SleuthKafkaAspect.WrapProducerFactory()方法,跟踪信息不会通过kafka消息传播。在生产者端,消息被正确发送,跟踪信息被正确记录。在消费者端,将创建一个新的traceId和spanId。

以下两个日志记录行显示了traceId、spanId(和parentId)的不同值:

2021-03-23 11:42:30.158 [http-nio-9185-exec-2] INFO  my.company.Producer - /4afe07273872918b/4afe07273872918b// - Sending event='MyEvent'
2021-03-23 11:42:54.374 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO my.company.Consumer /1fec3bf6a3c91773/ff4bd26b2e509ed8/1fec3bf6a3c91773/ - Received new event='MyEvent'

首先,使用Krafdrop和调试,我验证了消息头不包含任何跟踪信息。

之后,我发现sleuthKafkaAspect.WrapProducerFactory()方法从未被触发,而在消费者端则是sleuthKafkaAspect.AnyConsumerFactory()方法。

使用的库版本如下:

  • Spring引导:2.3.7.发布
  • Spring云BOM:hoxton.sr10
  • Spring Cloud:2.2.7.发行版(和2.2.5.发行版)
  • Spring Kafka:2.5.10.发布
  • Kakfa客户端:2.4.1
  • spring-cloud-starter-sleuth:2.2.7.发布
  • spring-cloud-sleuth-zipkin:2.2.7.发布

kakfa client library版本为2.4.1是由于与2.5.1版本的kafka client的生产错误相关的版本降级,该错误增加了cpu的使用。我也尝试使用以下库版本组合,但没有成功:

  • Spring引导:2.3.7.发布
  • Spring云BOM:hoxton.sr10(和hoxton.sr8)
  • Spring Cloud:2.2.7.发行版(和2.2.5.发行版)
  • Spring Kafka:2.5.10.发布
  • Kakfa客户端:2.5.1
  • spring-cloud-starter-sleuth:2.2.7.发行版(和2.2.5.发行版)
  • spring-cloud-sleuth-zipkin:2.2.7.发行版(和2.2.5.发行版)
  • Spring引导:2.3.7.发布
  • Spring云BOM:hoxton.sr10(和hoxton.sr8)
  • Spring Cloud:2.2.7.发行版(和2.2.5.发行版)
  • Spring Kafka:2.5.10.发布
  • Kakfa客户端:2.6.0
  • spring-cloud-starter-sleuth:2.2.7.发行版(和2.2.5.发行版)
  • spring-cloud-sleuth-zipkin:2.2.7.发行版(和2.2.5.发行版)
    null
  • Spring-boot:2.3.0.release
  • Spring-Kafka:2.5.0.发布
  • Kafka-客户端:2.4.1
  • Spring-Cloud:2.2.5.发布
  • spring-cloud-starter-sleuth:2.2.5.发布
  • spring-cloud-sleuth-zipkin:2.2.5.发布

我们还引入了一个log42/log4j(在它是带有logback的slf4j之前)。

以下是相关库:

- org.springframework.boot:spring-boot-starter-log4j2:jar:2.3.7.RELEASE:compile
- org.slf4j:jul-to-slf4j:jar:1.7.30:compile
- io.projectreactor:reactor-test:jar:3.3.12.RELEASE:test
- io.projectreactor:reactor-core:jar:3.3.12.RELEASE:test
- org.reactivestreams:reactive-streams:jar:1.0.3:test

配置的属性如下:

spring.sleuth.messaging.enabled=true
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.client-id=myClientIdentifier
spring.kafka.consumer.group-id=MyConsumerGroup
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

@Configuration
@EnableTransactionManagement
public class KafkaProducerConfig {

    KafkaProperties kafkaProperties;

    @Autowired
    public KafkaProducerConfig(
            KafkaProperties kafkaProperties) {
        this.kafkaProperties = kafkaProperties;
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());
        return kafkaTemplate;
    }


    private ProducerFactory<String, Object> producerFactory() {
        DefaultKafkaProducerFactory<String, Object> defaultKafkaProducerFactory =
                new DefaultKafkaProducerFactory<>(producerConfigs());
        //defaultKafkaProducerFactory.transactionCapable();
        //defaultKafkaProducerFactory.setTransactionIdPrefix("tx-");
        return defaultKafkaProducerFactory;
    }

    private Map<String, Object> producerConfigs() {

        Map<String, Object> configs = kafkaProperties.buildProducerProperties();
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return configs;
    }

}

我的spring boot应用程序类:


@Profile("DEV")
@SpringBootApplication(
        scanBasePackages = {"my.company"},
        exclude = {
                DataSourceAutoConfiguration.class,
                DataSourceTransactionManagerAutoConfiguration.class,
                HibernateJpaAutoConfiguration.class
        }
)
@EnableSwagger2
@EnableFeignClients(basePackages = {"my.company.common", "my.company.integration"})
@EnableTransactionManagement
@EnableMongoRepositories(basePackages = {
        "my.company.repository"})
@EnableMBeanExport(registration = RegistrationPolicy.IGNORE_EXISTING)
@ServletComponentScan
public class DevAppStartup extends SpringBootServletInitializer {

    public static void main(String[] args) {
        SpringApplication.run(DevAppStartup.class, args);
    }

}

在这里可以找到命令“mvn dependency:tree”mvn_dependency_tree.txt的输出

共有1个答案

慕容光启
2023-03-14

正如文档所建议的,如果要使用自己的Kafkatemplate,则需要创建ProducerFactorybean:

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, Object>producerFactory(KafkaProperties kafkaProperties) {
        return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object>producerFactory) {
        return new KafkaTemplate<>(producerFactory);
    }
}
 类似资料:
  • 问题内容: 我正在使用Java 1.6,Eclipse和Ant。 以下是我创建和运行jar文件的目标: 问题是,当我(通过Ant或命令行)运行此jar时,收到错误消息: 一些可能有用的知识: 当我打印我的类路径时,它表明所有必需的JAR都在其中。它还显示了类路径的Eclipse GUI版本。 我尝试清理项目(通过Eclipse和Ant)都无济于事。 似乎缺少的库.jar 不是 .jar中的.jar

  • 我使用github的时间相对较短,并且一直使用客户端执行提交和拉取。我决定从昨天的git bash开始尝试它,并且我成功地创建了一个新的repo和提交的文件。 今天,我从另一台计算机上对存储库进行了更改,我提交了更改,现在我回到家里,执行了来更新我的本地版本,我得到了以下信息: 这次回购的唯一贡献者是我,没有分支(只有一个主人)。我在windows上执行了git Bash中的pull: 我做错了什

  • 我使用和创建了一个应用程序。在我的应用程序中,每个用户都有多个页面。 我想向用户显示他们页面的访问者的统计数据,比如访问者的数量,设备,国家等等。 这个服务完全符合我想要实现的目标https://11uptime.com/&https://11uptime.com/assets/images/screenshots/webp/status-page-a.webp 正如您所看到的,每个页面都有自己的

  • 当我创建spring.Zipkin.sender.type=web时,Zipkin dashboard中显示了跟踪,这里有没有我缺少的东西。

  • 我们正在开发一个用Java编写的邮件客户端。它具有相同的功能,如Outlook或雷鸟等。它直接与邮件服务器通信。此外,我们的业务规则要求我们将所有消息存储在数据库中,并且消息应该始终保持同步。我知道那不太适合IMAP,但是我们必须把所有的东西都保存在我们的数据库中。问题出现了,如何跟踪从文件夹A移动到文件夹B的IMAP消息?我们怎样才能知道这件事?如果您从A中删除一条消息,它将从A中删除,并在B中