当前位置: 首页 > 知识库问答 >
问题:

LocalDateTime的自定义sping-kafka反序列化器

阮鸿煊
2023-03-14

我正在构建一个简单的项目与Spring boot和sping-kafka,我不能配置它,使其工作,它是一个简单的应用程序,生成笔记(作者,内容,createddatetime,lastmodefieddatetime)和发送基于笔记的事件,当他们被创建。

我已经玩了两天了,但我想我还没学会。

这是我的配置,我很确定它有很多锅炉板,但我已经用了几个例子来使我的工作。

我有2个生产者和消费者工厂,因为我需要一个默认的,是真的吗?我需要为我愿意发送的每种类型的消息创建一个自定义工厂吗?

我的应用程序.yml

spring.datasource.url: jdbc:mysql://localhost:3306/notes
spring.datasource.username: root
spring.datasource.password:

logging.level.org.hibernate.SQL: debug

spring.jpa.database:  MYSQL
spring.jpa.open-in-view: true
spring.jpa.show-sql:  true
spring.data.jpa.repositories.bootstrap-mode: default

spring.jpa.database-platform: org.hibernate.dialect.MySQL5Dialect
spring.jpa.hibernate.ddl-auto: update

logging.level.org.springframework: DEBUG
spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver

spring.kafka.bootstrap-servers: 192.168.169.22:9092
spring.kafka.consumer.group-id: noteGroup
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.properties.spring.json.trusted.packages: com.remusrd.notesample.domain.event
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.add.type.headers: true
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

我的制作人

package com.remusrd.notesample.service

import arrow.core.Option
import arrow.core.getOrElse
import arrow.data.NonEmptyList
import com.remusrd.notesample.data.NoteRepository
import com.remusrd.notesample.domain.Note
import com.remusrd.notesample.domain.event.NoteEvent
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional


@Service
@Transactional
class JpaNoteService : NoteService {
    val TOPIC_NAME = "notes"

    @Autowired
    private lateinit var noteRepository: NoteRepository
    @Autowired
    private lateinit var kafkaTemplate: KafkaTemplate<String, NoteEvent>

    override fun getAllNotes(): Option<NonEmptyList<Note>> =
        NonEmptyList.fromList(noteRepository.findAll())

    override fun createNote(note: Option<Note>) : Note {
        note.map {
            kafkaTemplate.send(TOPIC_NAME,  NoteEvent.Created(it))
        }
        return note.getOrElse { Note(id=0) }
    }

    @Override
    @Transactional(readOnly = true)
    override fun getNotesByAuthor(author: String): Option<NonEmptyList<Note>> {
        val noteList = noteRepository.findByAuthor(author)
        return NonEmptyList.fromList(noteList)
    }
}

我的消费者

package com.remusrd.notesample.service

import org.springframework.kafka.annotation.KafkaListener
import org.springframework.messaging.Message
import org.springframework.stereotype.Component

@Component
class createdNotesConsumer {


    @KafkaListener(topics = ["notes"], groupId = "noteGroup")
    fun recieve(noteEvent: Message<Any>) {
        println("received" + noteEvent + noteEvent.javaClass)
    }
}

实体

package com.remusrd.notesample.domain

import java.time.LocalDateTime
import javax.persistence.*

@Entity
@Table(name = "note")
data class Note(
    @Id @GeneratedValue(strategy = GenerationType.IDENTITY)
    val id: Long,
    val content: String = "",
    val creationDate: LocalDateTime = LocalDateTime.now(),
    val lastModified: LocalDateTime = LocalDateTime.now(),
    val author: String = ""
)

和我的构建

buildscript {
    ext {
        kotlinVersion = "1.3.10"
        springBootVersion = "2.1.1.RELEASE"
        springCloudVersion = "Greenwich.M3"
        arrow_version = "0.8.1"
    }
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
        classpath("org.jetbrains.kotlin:kotlin-gradle-plugin:${kotlinVersion}")
        classpath("org.jetbrains.kotlin:kotlin-allopen:${kotlinVersion}")
        classpath("org.jetbrains.kotlin:kotlin-noarg:${kotlinVersion}")
        classpath("io.spring.gradle:dependency-management-plugin:1.0.2.RELEASE")
    }
}

apply plugin: "kotlin"
apply plugin: "kotlin-spring"
apply plugin: "org.springframework.boot"
apply plugin: "io.spring.dependency-management"
apply plugin: "kotlin-allopen"
apply plugin: "kotlin-noarg"
apply plugin: "kotlin-jpa"


group "com.remusrd"
version "0.0.1-SNAPSHOT"

sourceCompatibility = 1.8

repositories {
    mavenCentral()
    maven { url 'http://repo.spring.io/milestone' }
}
noArg{
    annotation("com.remusrd.notesample.domain.annotation.NoArg")
}
allOpen{
    annotation("com.remusrd.notesample.domain.annotation.Open")
}

dependencies {
    // Kotlin
    implementation "org.jetbrains.kotlin:kotlin-stdlib"
    implementation "org.jetbrains.kotlin:kotlin-reflect"
    implementation "com.fasterxml.jackson.module:jackson-module-kotlin"
    implementation "io.arrow-kt:arrow-core:$arrow_version"
    implementation "io.arrow-kt:arrow-data:$arrow_version"

    // Spring Boot
    implementation "org.springframework.cloud:spring-cloud-starter-netflix-eureka-client"
    implementation "org.springframework.boot:spring-boot-starter-web:$springBootVersion"
    implementation "org.springframework.boot:spring-boot-starter-data-jpa:$springBootVersion"
    implementation "org.springframework.kafka:spring-kafka"
    implementation "com.fasterxml.jackson.module:jackson-module-kotlin:2.9.7"



    // BBDD
    implementation "mysql:mysql-connector-java:8.0.13"
    implementation "com.h2database:h2:1.4.197"


    // Test
    testImplementation "junit:junit:4.12"
    testImplementation("org.springframework.boot:spring-boot-starter-test")
}

dependencyManagement {
    imports {
        mavenBom "org.springframework.boot:spring-boot-dependencies:${springBootVersion}"
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
    }
}

compileKotlin {
    kotlinOptions {
        freeCompilerArgs = ["-Xjsr305=strict"]
        jvmTarget = "1.8"
    }
}
compileTestKotlin {
    kotlinOptions {
        freeCompilerArgs = ["-Xjsr305=strict"]
        jvmTarget = "1.8"
    }
}

这是我得到的痕迹

2018-12-05 16:48:56.884 ERROR 8331 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: null

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition notes-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[123, 34, 110, 111, 116, 101, 34, 58, 123, 34, 105, 100, 34, 58, 48, 44, 34, 99, 111, 110, 116, 101, 110, 116, 34, 58, 34, 72, 111, 108, 97, 32, 113, 117, -61, -87, 32, 116, 97, 108, 34, 44, 34, 99, 114, 101, 97, 116, 105, 111, 110, 68, 97, 116, 101, 34, 58, 34, 50, 48, 49, 56, 45, 49, 50, 45, 48, 53, 32, 49, 54, 58, 52, 53, 58, 53, 57, 34, 44, 34, 108, 97, 115, 116, 77, 111, 100, 105, 102, 105, 101, 100, 34, 58, 34, 50, 48, 49, 56, 45, 49, 50, 45, 48, 53, 32, 49, 54, 58, 52, 53, 58, 53, 57, 34, 44, 34, 97, 117, 116, 104, 111, 114, 34, 58, 34, 82, 105, 99, 104, 97, 114, 100, 34, 125, 125]] from topic [notes]
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `java.time.LocalDateTime` (no Creators, like default construct, exist): no String-argument constructor/factory method to deserialize from String value ('2018-12-05 16:45:59')
 at [Source: (byte[])"{"note":{"id":0,"content":"yo","creationDate":"2018-12-05 16:45:59","lastModified":"2018-12-05 16:45:59","author":"Richard"}}"; line: 1, column: 58] (through reference chain: com.remusrd.notesample.domain.event.NoteEvent$Modified["note"]->com.remusrd.notesample.domain.Note["creationDate"])
    at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:67) ~[jackson-databind-2.9.7.jar:2.9.7]
    at com.fasterxml.jackson.databind.DeserializationContext.reportBadDefinition(DeserializationContext.java:1452) ~[jackson-databind-2.9.7.jar:2.9.7]
    at com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1028) ~[jackson-databind-2.9.7.jar:2.9.7]
    at com.fasterxml.jackson.databind.deser.ValueInstantiator._createFromStringFallbacks(ValueInstantiator.java:371) ~[jackson-databind-2.9.7.jar:2.9.7]
    at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.createFromString(StdValueInstantiator.java:323) ~[jackson-databind-2.9.7.jar:2.9.7]
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromString(BeanDeserializerBase.java:1373) ~[jackson-databind-2.9.7.jar:2.9.7]
    at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:171) ~[jackson-databind-2.9.7.jar:2.9.7]
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:161) ~[jackson-databind-2.9.7.jar:2.9.7]
    at com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:136) ~[jackson-databind-2.9.7.jar:2.9.7]
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:369) ~[jackson-databind-2.9.7.jar:2.9.7]
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:159) ~[jackson-databind-2.9.7.jar:2.9.7]
    at com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:136) ~[jackson-databind-2.9.7.jar:2.9.7]
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:369) ~[jackson-databind-2.9.7.jar:2.9.7]
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:159) ~[jackson-databind-2.9.7.jar:2.9.7]
    at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:1611) ~[jackson-databind-2.9.7.jar:2.9.7]
    at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1234) ~[jackson-databind-2.9.7.jar:2.9.7]
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:328) ~[spring-kafka-2.2.2.RELEASE.jar:2.2.2.RELEASE]
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1041) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3300(Fetcher.java:110) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1223) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:1072) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:562) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:523) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1230) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154) ~[kafka-clients-2.0.1.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:719) ~[spring-kafka-2.2.2.RELEASE.jar:2.2.2.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:676) ~[spring-kafka-2.2.2.RELEASE.jar:2.2.2.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_171]
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [na:1.8.0_171]
    at java.util.concurrent.FutureTask.run(FutureTask.java) [na:1.8.0_171]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]

编辑:https://github.com/RemusRD/notesample 这是存储库,如果您有任何建议可以改进代码,请告诉我

编辑2:这是新的KafkaConfig

package com.remusrd.notesample.configuration

import com.fasterxml.jackson.databind.ObjectMapper
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.autoconfigure.jackson.Jackson2ObjectMapperBuilderCustomizer
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder
import org.springframework.kafka.core.*
import org.springframework.kafka.support.serializer.JsonDeserializer
import org.springframework.kafka.support.serializer.JsonSerializer


@Configuration
class KafkaConfig {
    @Autowired
    lateinit var jackson2ObjectMapperBuilder: Jackson2ObjectMapperBuilder
    @Autowired
    lateinit var kafkaProperties: KafkaProperties


    @Bean
    fun kafkaTemplate(): KafkaTemplate<Any, Any> {
        return KafkaTemplate<Any, Any>(defaultKafkaProducerFactory())
    }


    @Bean
    fun defaultKafkaConsumerFactory(): ConsumerFactory<Any, Any> {
        val objectMapper = jackson2ObjectMapperBuilder.build() as ObjectMapper
        val jsonDeserializer = JsonDeserializer<Any>(objectMapper)
        jsonDeserializer.configure(kafkaProperties.buildConsumerProperties(), false)
        val kafkaConsumerFactory = DefaultKafkaConsumerFactory<Any, Any>(
            kafkaProperties.buildConsumerProperties(),
            jsonDeserializer,
            jsonDeserializer
        )
        kafkaConsumerFactory.setValueDeserializer(jsonDeserializer)
        return kafkaConsumerFactory
    }

    @Bean
    fun defaultKafkaProducerFactory(): ProducerFactory<Any, Any> {
        val jsonSerializer = JsonSerializer<Any>(jackson2ObjectMapperBuilder.build())
        jsonSerializer.configure(kafkaProperties.buildProducerProperties(), false)
        val factory = DefaultKafkaProducerFactory<Any, Any>(
            kafkaProperties.buildProducerProperties(),
            jsonSerializer,
            jsonSerializer
        )
        val transactionIdPrefix = kafkaProperties.producer
            .transactionIdPrefix
        if (transactionIdPrefix != null) {
            factory.setTransactionIdPrefix(transactionIdPrefix)
        }
        return factory
    }
}

共有1个答案

乜昆
2023-03-14

您将自定义的< code>JsonDeserializer填充到< code>ConsumerFactory上的< code>keyDeserializer中的问题:

@Bean
fun defaultKafkaConsumerFactory(): ConsumerFactory<Any, Any> {
    val objectMapper = jackson2ObjectMapperBuilder.build() as ObjectMapper
    objectMapper.registerModule(JavaTimeModule())
    val jsonDeserializer = JsonDeserializer<Any>(objectMapper)
    jsonDeserializer.configure(kafkaProperties.buildConsumerProperties(), false)
    val kafkaConsumerFactory = DefaultKafkaConsumerFactory<Any, Any>(
        kafkaProperties.buildConsumerProperties()
    )
    kafkaConsumerFactory.setKeyDeserializer(jsonDeserializer)
    return kafkaConsumerFactory
}

使用setValueDeserializer()或者只是删除这个以支持那个kafka消费者工厂

完全不知道为什么你有一对消费者工厂生产者工厂

 类似资料:
  • I'va是一个OID接口,可以由许多具体类型实现: 现在我有一个具有两个字段的对象,一个使用抽象接口类型(OID)定义,另一个使用具体类型(MyOID)定义 我想使用jackson以不同的方式序列化/反序列化字段,无论它们是使用抽象接口类型还是具体类型定义的: 注意,被序列化,包括类型信息(多态序列化),而被序列化为文本 为此,我将OID接口注释为: 并为每个具体类型分配了类型id: 最后,对容器

  • 我想反序列化表单中的类: 其中文本是加密的,反序列化应该在重建TestFieldEncryptedMessage实例之前取消对值的加密。 我采用的方法非常类似于:https://github.com/codesqueak/jackson-json-crypto 也就是说,我正在构建一个扩展SimpleModule的模块: 如您所见,设置了两个修饰符:EncryptedSerializerModif

  • 我有一个JestClient(elasticsearch)响应,我试图将其反序列化为一个对象。该对象包含两个DateTime字段,而在响应中,它们是字符串,因此我得到: 所以,我创建了一个自定义反序列化器来解决这个问题…然而,无论我做什么,我总是得到同样的错误。不知何故它没有注册使用它? 最后,我试图解析JestClient响应的代码: 无论我尝试什么,我总是得到上面的错误,我甚至不确定在这一点上

  • 我正在尝试为泛型列表创建一个自定义反序列化器。假设我得到了类B的json表示: 其中A是我仅在运行时看到的其他类。我想创建一个反序列化程序,该程序能够将listObject的类型推断为具有内部类型a的list,并将其反序列化,而不是使用默认的hashmap反序列化程序。 我尝试使用上下文反序列化器,类似于这里建议的,然后将其添加为List的自定义反序列化器 但是我不确定我应该如何读取json并在反

  • 我在一个Kafka消息中使用了我自己的类,它有一堆字符串数据类型。 我想我需要编写自己的序列化器并将其提供给生产者属性?

  • 我正在使用Spring Kafka集成,我有自己的值通用序列化器/反序列化器,如下所示 序列化程序: 反序列化程序: 序列化程序工作得很好,但是当涉及到在消费消息时反序列化值时,我得到了一个而不是所需的对象,请告诉我我错在哪里,提前感谢。