当前位置: 首页 > 知识库问答 >
问题:

无法解码Spring云流中的json类型DefaultKafkaHeaderMapper

雍志文
2023-03-14

我们正在使用spring cloud stream,并计划升级我们的Kafka版本
我们的应用程序使用apache kafka服务器1.0.1spring cloud stream:2.0.0(spring kafka 2.1.7),还使用spring cloud sleuth:2.0.0进行跟踪
我们将把Kafka服务器升级到2.3.0版,因此需要升级到spring boot 2.2版。x(霍克斯顿)春云侦探:2.2.0春云流:3.0.3(Horsham.SR3)
我们有约200个使用Kafka的应用程序,因此升级将逐步进行,因此作为中间状态,我们将有更新版本的生产者和使用旧版本的消费者
我们的消费者正在使用@StreamListener

在我们的测试中,我们遇到了一个问题:解析大多数类型为String的标题,并获得以下信息:

ERROR 27448 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$4  : Could not decode json type: ecb89ccb3e79418b for key: X-B3-TraceId
com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ecb89ccb3e79418b': was expecting ('true', 'false' or 'null')
 at [Source: (byte[])"ecb89ccb3e79418b"; line: 1, column: 33]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:679) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3526) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2621) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:826) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:723) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4141) ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4000) ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3091) ~[jackson-databind-2.9.6.jar:2.9.6]
    at org.springframework.kafka.support.DefaultKafkaHeaderMapper.lambda$toHeaders$1(DefaultKafkaHeaderMapper.java:233) ~[spring-kafka-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at java.lang.Iterable.forEach(Iterable.java:75) ~[na:1.8.0_221]
    at org.springframework.kafka.support.DefaultKafkaHeaderMapper.toHeaders(DefaultKafkaHeaderMapper.java:216) ~[spring-kafka-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$4.toHeaders(KafkaMessageChannelBinder.java:554) ~[spring-cloud-stream-binder-kafka-2.0.0.RELEASE.jar:2.0.0.RELEASE]
    at org.springframework.kafka.support.converter.MessagingMessageConverter.toMessage(MessagingMessageConverter.java:106) ~[spring-kafka-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:229) ~[spring-kafka-2.1.7.RELEASE.jar:2.1.7.RELEASE]
...

而类型标题是:

{spanTraceId=java.lang.String, spanId=java.lang.String, spanParentSpanId=java.lang.String, nativeHeaders=org.springframework.util.LinkedMultiValueMap, X-B3-SpanId=java.lang.String, X-B3-ParentSpanId=java.lang.String, scst_partition=java.lang.Integer, X-B3-Sampled=java.lang.String, X-B3-TraceId=java.lang.String, spanSampled=java.lang.String, contentType=java.lang.String}

例如,Sleuth添加的X-B3-SpanId类型为String,值为:ecb89ccb3e79418b,不是JSON字符串,因此ObjectMapper在转换为String对象时失败:

headers.put(h.key(), getObjectMapper().readValue(h.value(), type))

看起来当我们有字符串类型时,它不应该使用ObjectMapper,因此我们的旧消费者失败了。

在使用新生产者和旧消费者时,有没有办法防止这个问题?

共有1个答案

孙玮
2023-03-14

您可以将DefaultKafkaHeaderMapper配置为与旧版本兼容:

    /**
     * Set to true to encode String-valued headers as JSON ("..."), by default just the
     * raw String value is converted to a byte array using the configured charset. Set to
     * true if a consumer of the outbound record is using Spring for Apache Kafka version
     * less than 2.3
     * @param encodeStrings true to encode (default false).
     * @since 2.3
     */
    public void setEncodeStrings(boolean encodeStrings) {
        this.encodeStrings = encodeStrings;
    }

也看到https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.0.10.RELEASE/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_binder_properties

spring。云流动Kafka。粘合剂headerMapperBeanName

 类似资料:
  • 我使用Spring Cloud Data Flow设置一个读取CSV文件的流,使用自定义处理器对其进行转换并记录: 文件和csvToMap应用程序工作正常,但在日志应用程序中,我看到这种异常,对于每条记录: 对于file\u relativePath标头,也会引发此异常。我不明白为什么斯普林·Kafka试图将它们解读为JSON。 此外,日志接收器以正确的方式记录我的记录: 出于调试目的,我将kaf

  • 我已经部署了一个CloudFirebase函数来更新一些聚合数据,但是我得到了 聚合收据评级:错误:无法从文件快照中的消防恢复值解码类型:{“integerValue”:“3”}。_decodeValue(/user_code/node_modules/Firebase-admin/node_modules/@google-Cloud/消防恢复/src/document.js:464: 15)在D

  • 在React应用程序中使用Firestore,我正在测试使用Google云函数获取一个文档字段,然后使用该字段在Firestore中创建一个新字段。前几天我让它工作了,现在不行了。 这是我在谷歌云功能日志中得到的错误: 错误:无法解码文件快照中FiresteValue:{"stringValue":"sox"}的类型。_decodeValue 下面是我使用的代码。它与此处的代码非常相似:https

  • 问题内容: 嘿,我不确定为什么每次选择图库中的图像时都会出现这种情况吗? 这是代码: 错误: 问题答案: 不要假设有文件路径。Android 4.4及更高版本即将删除它们。而且您获得的uri已经没有路径。 您仍然可以通过()或文件描述符访问文件内容。 在这里进行了解释:ContentProviders:打开一个文档(向下滚动,指向该节的链接似乎已损坏) 而且确实适用于较旧的android版本。

  • 问题内容: 我想创建一个结构,其中一个字段可以举行一些特殊类型的,比如说的数据,和。我想将此结构从JSON解码/编码。我们如何在go / golang中实现这一目标? 例如,我具有以下定义的结构: 哪里是 假设我需要将以下JSON解组到上述struct中: 还有这个: 而且这个还: 我该如何实现? 我可以自己定义并继续实现吗? 还是有一种定义自定义类型的方法,例如说并定义为 然后定义并继续? 我也