当前位置: 首页 > 知识库问答 >
问题:

添加跟踪和span id到Flink作业

贺博厚
2023-03-14

我需要向集群中运行的Flink作业添加track和span id,请求流如下所示

使用者--

我使用Spring Boot来创建我的rest API,并使用Spring Sleuth来添加跟踪和span id到生成的日志中,当调用rest API时添加跟踪和span id,当消息被放在Kakfa-toption-1上时也添加跟踪和span id,但我不能弄清楚如何添加跟踪和跨度ID,同时在Flink作业-1和FLink作业-2上消费消息,因为它们不在Spring上下文中。

一种方法是将track和span Id设置为kafka消息头,并让kafka消费者/生产者拦截器提取和记录track和span Id,我尝试了这个方法,但我的拦截器没有被调用,因为Flink API使用Flink版本的kafka客户机。

无法调用我的自定义KafkaDeserializationSchema

public class MyDeserializationSchema implements KafkaDeserializationSchema<String> {

private static final Logger LOGGER = LoggerFactory.getLogger(MyDeserializationSchema.class);

@Override
public TypeInformation<String> getProducedType() {
    System.out.println("************** Invoked 1");
    LOGGER.debug("************** Invoked 1");
    return null;
}

@Override
public boolean isEndOfStream(String nextElement) {
    System.out.println("************** Invoked 2");
    LOGGER.debug("************** Invoked 2");
    return true;
}

@Override
public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
    System.out.println("************** Invoked 3");
    LOGGER.debug("************** Invoked 3");
    return record.toString();
}

 }

有人能建议我如何实现同样的目标吗。

共有2个答案

丌官嘉福
2023-03-14

您在这里使用的是一个简单的字符串,在序列化字节到字符串时,可以执行如下代码所示的操作。

public class MyDeserializationSchema  implements KafkaDeserializationSchema<String> {
    @Override
    public boolean isEndOfStream(String nextElement) {
        return false;
    }

    @Override
    public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        return new String(record.value(), "UTF-8");
    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
  }
关冠宇
2023-03-14

您也可以使用KafkaDeserializationSchema来获取标题

为了访问Kafka消息的键、值和元数据,KafkaDeserializationSchema具有以下反序列化方法T deserialize(ConsumerRecord记录)。

public class Bla implements KafkaDeserializationSchema {
    @Override
    public boolean isEndOfStream(Object dcEvents) {
        return false;
    }

    @Override
    public Object deserialize(ConsumerRecord consumerRecord) throws Exception {
        return null;
    }



    @Override
    public TypeInformation<DCEvents> getProducedType() {
        return null;
    }
 类似资料:
  • 我已经开始学习Sleuth,但是我还停留在日志配置上。 我有这样的配置: 但是当我检查文件时,我缺少跟踪 Id 和 spanId。下面是示例: 这是pom.xml的片段 不知道如何在这里继续,我一直在阅读文档和堆栈溢出帖子,但仍然找不到任何答案。 提前致谢。

  • 我有一个Spring Boot应用程序,它依赖于spring-cloud-starter-sleuth-3.0.3和spring-cloud-sleuth-zipkin-3.0.3。 我需要在从webclient调用API时将跟踪id传递给请求头。 示踪剂。currentSpan()为null,因此引发NPE。 根据文档,给出了将跟踪id添加到响应头的方法https://docs.spring.i

  • 我正在努力创建篮球投篮图表在R使用库绘图,虽然我目前卡住了。作为参考,我正在创建的图形类型希望在完成时看起来有点像这样: 从绘图的角度来看,我需要使用一种轨迹类型,它允许我在整个图形上放置六边形(或其他形状)。我相信我将能够适当地调整六边形的颜色和大小参数,以说明哪些六边形应该是红色、橙色和黄色,哪些六边形应该是全尺寸、更小和根本不存在。我只需要知道从哪里开始跟踪/模式。 这张图的基础是我拥有的数

  • 如何在log4j2中获取跟踪id和span id,而不是在[traceId,spanId]中获取? 预期输出:[APPNAME,5A59B2372D9A3814,5A59B2372D9A3814] 实际输出:[loglevel=error]--2021-01-21 11:30:32,489+0530--http-nio-8080-exec-1 com.springboot.test.aspect.

  • 本章介绍如何使用Zipkin或Jaeger收集启用了Istio的应用程序的调用链信息。 完成本章后,你可以理解有关应用程序的所有假设以及如何使其参与跟踪,无论您使用何种语言/框架/平台构建应用程序。 BookInfo示例用来作为此任务的示例应用程序。 环境准备 参照安装指南的说明安装Istio。 如果您在安装过程中未启动Zipkin或Jaeger插件,则可以运行以下命令启动: 启动Zipkin: