当前位置: 首页 > 知识库问答 >
问题:

从Kafka消息在spring sleuth中注入TraceId

梁华清
2023-03-14

我正在研究多微服务体系结构,其切入点来自Kafka。例如,假设micro1中有三个microservice micro1、micro2和micro3请求来自kafka消息队列,它通过Rest客户端进一步与micro2和micro3通信。

micro1接收到的消息包含requestId,我需要在spring sleuth中用它代替TraceId,它应该在所有microservice中传播。

我已经通过MDC尝试过了,但在这种情况下,traceId并没有传播到其他微服务。

有没有其他方法可以在sleuth中实现自定义TraceId而不是自动生成的?

谢谢

共有1个答案

东门文斌
2023-03-14

我不认为这是个好主意。保留生成的跟踪标识,并创建另一个将作为行李传播的生成字段。

如果您真的需要更改ID的生成方式,那么您必须更改这个bean

https://github.com/spring-cloud/spring-cloud-sleuth/blob/v2.0.2.RELEASE/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/autoconfig/TraceAutoConfiguration.java#L113-L133

@Bean
    @ConditionalOnMissingBean
    Propagation.Factory sleuthPropagation(SleuthProperties sleuthProperties) {
        if (sleuthProperties.getBaggageKeys().isEmpty() && sleuthProperties.getPropagationKeys().isEmpty()) {
            return B3Propagation.FACTORY;
        }
        ExtraFieldPropagation.FactoryBuilder factoryBuilder = ExtraFieldPropagation
                .newFactoryBuilder(B3Propagation.FACTORY);
        if (!sleuthProperties.getBaggageKeys().isEmpty()) {
            factoryBuilder = factoryBuilder
                    // for HTTP
                    .addPrefixedFields("baggage-", sleuthProperties.getBaggageKeys())
                    // for messaging
                    .addPrefixedFields("baggage_", sleuthProperties.getBaggageKeys());
        }
        if (!sleuthProperties.getPropagationKeys().isEmpty()) {
            for (String key : sleuthProperties.getPropagationKeys()) {
                factoryBuilder = factoryBuilder.addField(key);
            }
        }
        return factoryBuilder.build();
    }
 类似资料:
  • 我正在运行一个简单的Kafka streams应用程序,它将使用Node JS记录的信息带到一个Kafka主题。 还需要注意的是,时间戳只是一个数字,表示自1970年6月以来的秒数。 我使用scala中的Kafka流来使用这些数据。 例如。 然而,我不确定如何将时间戳(我从nodeJS发送的)提取到这个流中。 例如,如果我尝试做这样的事情 这会导致错误“无法解析符号流”。我在想我该怎么解决这个问题

  • 我第一次试着让它工作,所以请容忍我。我正在尝试学习Kafka的检查点设置和处理“错误”消息,在不丢失状态的情况下重新启动。 用例:使用检查点。从Kafka那里读取一个整数流,保持一个连续的和。如果读到“坏”Kafka消息,请重新启动应用程序,跳过“坏”消息,保持状态。我的流看起来像这样: set1,5 set1,7 set1,foobar set1,6 我希望我的应用程序保留它看到的整数的运行总和

  • 在企业应用程序中,我试图使用netBeans 8.1将MessageDriven beans注入Web应用程序(到REST服务)。我在IDE中没有得到任何警告,但是,在部署时,我得到以下错误: 严重:加载应用程序时出现异常:CDI部署失败:Weld-001408:未满足类型StatisticsBean的依赖关系,其限定符@Default在注入点[BackedAnnotatedField]@Inje

  • 向源生成特殊的clear-message,这将导致聚合的消息变为空 将消息直接写入具有空数据的中间主题 另一种方式,也许kafka-streams已经有一个API调用了? 加分问题:如果我知道我不想让消息坐在中间话题中的时间超过6个月,我可以指示kafka-streams创建6M留存的中间话题,还是在我运行App之前我自己手动创建话题?

  • Kafka过去在我自己的电脑上工作得很好。我正在另一台电脑上工作,上面写着 为目录C:\tmp\kafka logs(kafka.server.LogDirFailureChannel)java中的\uu consumer\u offset-41创建日志时出错。木卫一。IOException:映射在sun失败。尼奥。总经理。Kafka地图(FileChannelImpl.java:940)。日志抽

  • 我的用例是,从生产者端,它将一行数据(大约100字节)作为一条消息发布到kafka topic,从消费者端,我希望一次消费5条消息,并将其提供给我的消费者逻辑。 我做了一个简单的例子,它总是得到一个消息并打印在控制台上。请建议我任何需要的配置更改,以实现这一点。 请在下面找到源代码。 使用以下命令启动生产者 /kafka生产者性能测试——num记录500——主题测试——吞吐量10——有效负载文件测