当前位置: 首页 > 知识库问答 >
问题:

使用spring-kakfa-stream读取KTable时发生LongDeserializer异常

司马狐若
2023-03-14

我在读取KTABLE时得到了LongDeserializer异常

我正在使用springcloudversion='finchley.rc1'springBootVersion='2.0.1.release'

该项目的github链接可在https://github.com/jaysara/kstreamanalytics获得

Exception in thread "panalytics-ac0fa75f-2ae4-4b26-9a04-1f80d1479112-StreamThread-2" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:74)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:91)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:549)
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:920)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:821)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8

共有1个答案

司徒元明
2023-03-14

您需要取消对这一行的注释:https://github.com/jaysara/kstreamanalytics/blob/master/src/main/resources/application.properties#l19

spring.cloud.stream.bindings.policyPaidAnalytic.producer.useNativeEncoding=true

默认情况下,绑定器尝试在出站时序列化,并使用application/json作为内容类型。因此,在您的例子中,它是以json(字符串)的形式出现的,这就是为什么您会得到长序列化异常的原因。通过将上面的标志设置为true,您要求绑定器后退,并通过使用longserde在本地序列化Kafka流。

重新运行时,可能需要清除主题policyanalytic或使用新主题。

 类似资料:
  • 我正在迁移一个Kafka Streams实现,它使用纯Kafka apis来使用sping-kafka,因为它被合并在sping-引导应用程序中。 一切都很好Stream,GlobalKtable,分支,我所有的工作都非常好,但我很难合并ReadOnlyKeyValueStore。基于这里的sping-kafka留档:https://docs.spring.io/spring-kafka/docs

  • 根据要求,我的应用程序首先需要读取一个大约75k-100k行和90列的Excel文件。但在XSSFWorkbook加载pkg/file时,我在第2行遇到了以下异常 线程“main”Java.lang.OutOfMemoryError中出现异常:com.sun.org.apache.xerces.internal.dom.DeferredDocumentMPL.CreateChunk中的Java堆空

  • 我正在尝试使用KStream-KTable leftJoin来丰富主题A中的条目和主题B。主题A是我的KStream,主题B是我的KTtable,它有大约2300万条记录。这两个主题中的键都没有计算,所以我必须使用reducer将KStream(主题B)转换为KTable。 下面是我的代码: 1)KTable初始化速度慢。(2000 msg/s左右),这正常吗?我的主题是只有1个分区。有什么方法可

  • 问题内容: 在 Android平台上从InputStream读取时,我遇到一个奇怪的问题。我不确定这是否是Android特有的问题,或者 总体上我做错了什么。 唯一特定于Android的是此调用: 这会从Android资产返回文件的InputStream。无论如何, 这是我遇到的问题: 当read()执行时,它抛出IOException。奇怪的是, 如果我进行了两个连续的单字节读取(或任意数量的单

  • 我通过Spring Boot中编写的侦听器使用JMS对象消息。我正在通过我的Camel应用程序将ObjectMessage发送到ActiveMQ队列,我正在Spring Boot应用程序中的侦听器类中侦听队列。 代码: 这是我的POJO类: 在ActiveMQ队列中,我收到的消息为: 我在Spring Boot中的JMS侦听器: 我得到一个Exception@line: 通过堆栈溢出和网络上的其他

  • 我开发了spring批处理应用程序,该应用程序生成由json对象列表组成的amqp(rabbitmq)消息。消息具有包含一些元数据的标头。Spring cloud stream应用程序正在消费消息,我使用了功能性方法。如何访问标题<将消息头用于除路由之外的任何内容,这是一种糟糕的方法吗?