当前位置: 首页 > 知识库问答 >
问题:

springboot-kafka java 8时间序列化

宗政权
2023-03-14
spring:
  jackson:
    serialization:
      write_dates_as_timestamps: false
  kafka:
    consumer:
      group-id: foo
      enable-auto-commit: true
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: my.package
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        spring.json.trusted.packages: my.package
      retries: 3
      acks: all
[INFO] |  |  +- com.fasterxml.jackson.core:jackson-databind:jar:2.9.6:compile
[INFO] |  |  |  +- com.fasterxml.jackson.core:jackson-annotations:jar:2.9.0:compile
[INFO] |  |  |  \- com.fasterxml.jackson.core:jackson-core:jar:2.9.6:compile
[INFO] |  |  \- com.fasterxml.jackson.datatype:jackson-datatype-jsr310:jar:2.9.6:compile
[INFO] |  |  +- com.fasterxml.jackson.datatype:jackson-datatype-jdk8:jar:2.9.6:compile
[INFO] |  |  \- com.fasterxml.jackson.module:jackson-module-parameter-names:jar:2.9.6:compile
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition Foo-0 at offset 4. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[123, 34, 105, 100, 34, 58, 34, 97, 50, 99, 50, 56, 99, 99, 101, 97, 49, 98, 98, 52, 51, 97, 97, 56, 53, 50, 49, 53, 99, 101, 49, 54, 57, 48, 52, 51, 51, 98, 51, 45, 50, 34, 44, 34, 97, 117, 116, 104, 111, 114, 34, 58, 34, 97, 110, 116, 111, 110, 105, 111, 34, 44, 34, 99, 114, 101, 97, 116, 101, 100, 34, 58, 123, 34, 104, 111, 117, 114, 34, 58, 49, 56, 44, 34, 109, 105, 110, 117, 116, 101, 34, 58, 52, 48, 44, 34, 115, 101, 99, 111, 110, 100, 34, 58, 53, 49, 44, 34, 110, 97, 110, 111, 34, 58, 51, 50, 53, 48, 48, 48, 48, 48, 48, 44, 34, 100, 97, 121, 79, 102, 89, 101, 97, 114, 34, 58, 50, 52, 48, 44, 34, 100, 97, 121, 79, 102, 87, 101, 101, 107, 34, 58, 34, 84, 85, 69, 83, 68, 65, 89, 34, 44, 34, 109, 111, 110, 116, 104, 34, 58, 34, 65, 85, 71, 85, 83, 84, 34, 44, 34, 100, 97, 121, 79, 102, 77, 111, 110, 116, 104, 34, 58, 50, 56, 44, 34, 121, 101, 97, 114, 34, 58, 50, 48, 49, 56, 44, 34, 109, 111, 110, 116, 104, 86, 97, 108, 117, 101, 34, 58, 56, 44, 34, 99, 104, 114, 111, 110, 111, 108, 111, 103, 121, 34, 58, 123, 34, 99, 97, 108, 101, 110, 100, 97, 114, 84, 121, 112, 101, 34, 58, 34, 105, 115, 111, 56, 54, 48, 49, 34, 44, 34, 105, 100, 34, 58, 34, 73, 83, 79, 34, 125, 125, 44, 34, 97, 103, 103, 114, 101, 103, 97, 116, 101, 73, 100, 34, 58, 34, 97, 50, 99, 50, 56, 99, 99, 101, 97, 49, 98, 98, 52, 51, 97, 97, 56, 53, 50, 49, 53, 99, 101, 49, 54, 57, 48, 52, 51, 51, 98, 51, 34, 44, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 48, 44, 34, 112, 114, 105, 122, 101, 73, 110, 102, 111, 34, 58, 123, 34, 110, 117, 109, 98, 101, 114, 79, 102, 87, 105, 110, 110, 101, 114, 115, 34, 58, 49, 44, 34, 112, 114, 105, 122, 101, 80, 111, 111, 108, 34, 58, 49, 48, 44, 34, 112, 114, 105, 122, 101, 84, 97, 98, 108, 101, 34, 58, 91, 49, 48, 93, 125, 125]] from topic [Foo]
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Expected array or string.
 at [Source: (byte[])"{"id":"a2c28ccea1bb43aa85215ce1690433b3-2","author":"foo","created":{"hour":18,"minute":40,"second":51,"nano":325000000,"dayOfYear":240,"dayOfWeek":"TUESDAY","month":"AUGUST","dayOfMonth":28,"year":2018,"monthValue":8,"chronology":{"calendarType":"iso8601","id":"ISO"}},"aggregateId":"a2c28ccea1bb43aa85215ce1690433b3","version":0,"prizeInfo":{"numberOfWinners":1,"prizePool":10,"prizeTable":[10]}}"; line: 1, column: 73] (through reference chain: my.package.Foo["created"])
    at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:63) ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1342) ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1138) ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.datatype.jsr310.deser.JSR310DeserializerBase._handleUnexpectedToken(JSR310DeserializerBase.java:99) ~[jackson-datatype-jsr310-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer.deserialize(LocalDateTimeDeserializer.java:141) ~[jackson-datatype-jsr310-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer.deserialize(LocalDateTimeDeserializer.java:39) ~[jackson-datatype-jsr310-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:136) ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:369) ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:159) ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:1611) ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1234) ~[jackson-databind-2.9.6.jar:2.9.6]
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:228) ~[spring-kafka-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923) ~[kafka-clients-1.0.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93) ~[kafka-clients-1.0.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100) ~[kafka-clients-1.0.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949) ~[kafka-clients-1.0.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570) ~[kafka-clients-1.0.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531) ~[kafka-clients-1.0.2.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154) ~[kafka-clients-1.0.2.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111) ~[kafka-clients-1.0.2.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699) ~[spring-kafka-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_131]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_131]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
@Bean
public ObjectMapper objectMapper() {
    ObjectMapper objectMapper = new ObjectMapper();
    objectMapper.registerModule(new JavaTimeModule());
    objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
    return objectMapper;
}

2.LocalDateTime字段上的序列化程序注释

为了确保我有正确的对象映射器设置和必要的依赖关系,我创建了一个rest控制器,将响应模拟为json作为restendpoint返回一个带有日期时间字段的对象,这将正确返回;示例:

[
    {
        "playerId": "foo",
        "points": 10,
        "entryDateTime": "2018-08-19T09:30:20.051"
    },
    {
        "playerId": "bar",
        "points": 3,
        "entryDateTime": "2018-08-27T09:30:20.051"
    }
]

共有1个答案

阚英武
2023-03-14

当您使用属性设置序列化器/反序列化器时,Kafka将实例化它们,而不是Spring。Kafka对Spring或定制的ObjectMapper一无所知。

您需要重写Boot的默认生产者/消费者工厂,并使用备用构造函数(或setter)来添加序列化器/反序列化器。

请参阅文档。

@Bean
public ConsumerFactory<Foo, Bar> kafkaConsumerFactory(KafkaProperties properties,
    JsonDeserializer customDeserializer) {

    return new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties(),
        customDeserializer, customDeserializer);
}

@Bean
public ProducererFactory<Foo, Bar> kafkaProducerFactory(KafkaProperties properties,
    JsonSserializer customSerializer) {

    return new DefaultKafkaConsumerFactory<>(properties.buildProducerProperties(),
        customSerializer, customSerializer);
}
 类似资料:
  • 我将我的数据存储在卡珊德拉·NoSQL数据库中,模式如下: 然后我使用。我希望数据是按时间序列排列的,第一天确实如此,但今天情况发生了变化。 我认为数据库忽略了日期,而只关心时间。 知道怎么解决这个问题吗?

  • 主要内容:创建时间戳,创建时间范围,更改时间频率,转化为时间戳,频率和周期转换,时间周期计算,创建时间周期,时间序列转换,创建日期范围,更改日频率,工作日时间顾名思义,时间序列(time series),就是由时间构成的序列,它指的是在一定时间内按照时间顺序测量的某个变量的取值序列,比如一天内的温度会随时间而发生变化,或者股票的价格会随着时间不断的波动,这里用到的一系列时间,就可以看做时间序列。时间序列包含三种应用场景,分别是: 特定的时刻(timestamp),也就是时间戳; 固定的日期(pe

  • 主要内容:什么是JFreeChart 时间序列图,JFreeChart 时间序列图的示例什么是JFreeChart 时间序列图 时间序列图表表示以相等的时间间隔变化的数字数据序列。 下图显示了 JFreeChart 库中包含的时间序列图表的一些演示版本: JFreeChart 时间序列图的示例 让我们考虑以下时间序列图表的示例数据。 日期 列1 列2 2017-01-01 50 40 2017-01-02 40 35 2017-01-03 45 26 2017-01-04 30 45

  • 我的任务是使用MATLAB和任何神经网络框架对时间序列数据进行分类。 更具体地描述任务:是计算机视觉领域的一个问题。Is是一项场景边界检测任务。 源数据是来自视频流的4个相邻帧直方图相关阵列。基于此数据,我们必须将此时间序列分为两类: “场景中断” “没有场景中断” 因此,每个源数据输入的网络输入是4个双倍值,输出是一个二进制值。我将在下面展示src数据的示例: 问题是,来自Matlab神经工具箱

  • 我有25万个这样的事件: 一个时间段代表半个30分钟,“Anzahl”是一个时间段中的事件数,第一个时间段从2011-01-01 00:00:00开始,“WochenSlots”是时间段%%336,从周六00:00:00开始。所以我想在一周内看到分布情况。 我现在想做的是: 以x标度显示日期(星期一00:00-星期日24:00) 显示显示x%事件分布的行(信封)。 我不知道该怎么做。 dput(P

  • 我正在阅读Keras中关于使用LSTM进行多元时间序列预测的教程https://machinelearningmastery.com/multivariate-time-series-forecasting-lstms-keras/#comment-442845 我已经看完了整个教程,遇到了一个如下的问题- 在本教程中,在步骤“t-1”中,列车和测试拆分有8个功能,即“污染”、“露水”、“温度”、