当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud Stream Kafka>使用来自汇流REST代理的Avro消息

孟雪风
2023-03-14

我有以下场景:

  • 生产者通过Confluent的REST代理(在Confluent的模式注册表上注册模式)向Kafka主题发送Avro编码的消息,如http://docs.confluent.io/3.0.0/kafka-rest/docs/intro.html#produce-and-consument-avro-messages
  • 所述
  • Spring Cloud Stream enabled message侦听主题中的新消息

我的应用程序看起来是这样的:

@SpringBootApplication
@EnableBinding(Sink.class)
public class MyApplication {
  private static Logger log = LoggerFactory.getLogger(MyApplication.class);

  public static void main(String[] args) {
    SpringApplication.run(MyApplication.class, args);
  }

  @StreamListener(Sink.INPUT)
  public void myMessageSink(MyMessage message) {
    log.info("Received new message: {}", message);
  }
}

而MyMessage是Avro从Avro模式创建的类。

我的Application.Properties如下所示:

spring.cloud.stream.bindings.input.destination=myTopic
spring.cloud.stream.bindings.input.group=${spring.application.name}
spring.cloud.stream.bindings.input.contentType=application/*+avro
org.springframework.messaging.MessagingException: Exception thrown while invoking MyApplication#myMessageSink[1 args]; nested exception is org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -27
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor$StreamListenerMessageHandler.handleRequestMessage(StreamListenerAnnotationBeanPostProcessor.java:316) ~[spring-cloud-stream-1.1.0.RELEASE.jar:1.1.0.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) ~[spring-integration-core-4.3.2.RELEASE.jar:4.3.2.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.2.RELEASE.jar:4.3.2.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.3.2.RELEASE.jar:4.3.2.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) ~[spring-integration-core-4.3.2.RELEASE.jar:4.3.2.RELEASE]
    ...
Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -27
    at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:430) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.generic.GenericDatumReader.readMapKey(GenericDatumReader.java:335) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.generic.GenericDatumReader.readMap(GenericDatumReader.java:321) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:177) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144) ~[avro-1.8.1.jar:1.8.1]
    at org.springframework.cloud.stream.schema.avro.AbstractAvroMessageConverter.convertFromInternal(AbstractAvroMessageConverter.java:91) ~[spring-cloud-stream-schema-1.1.0.RELEASE.jar:1.1.0.RELEASE]
    at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:175) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE]
    at org.springframework.messaging.converter.CompositeMessageConverter.fromMessage(CompositeMessageConverter.java:67) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE]
    at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:117) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE]
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:112) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:138) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:107) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE]
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor$StreamListenerMessageHandler.handleRequestMessage(StreamListenerAnnotationBeanPostProcessor.java:307) ~[spring-cloud-stream-1.1.0.RELEASE.jar:1.1.0.RELEASE]
    ... 35 common frames omitted

共有1个答案

凌昕
2023-03-14

当将per-binding属性spring.cloud.stream.kafka.bindings.input.consumer.configuration.value.deserializer设置为具有Confluent的KafkaAvroDeserializer类名时,它是否起作用?

 类似资料:
  • 我试图使用Confluent Kafka REST Proxy从我的一个主题中检索Avro格式的数据,但不幸的是,我得到了一个反序列化错误。我使用以下命令查询Kafka REST代理 我得到的回应是 Kafka Rest Proxy服务器上的日志如下: 数据是使用KafkaAvroSerializer生成的,模式在模式注册表中。还请注意,在CLI上使用avro console consumer可以

  • 我试图使用来自kafka主题的合流avro消息作为spring Boot2.0的Kstream。 我能够以的形式使用消息,但不能以的形式使用消息。 例外情况: 线程“pcs-7bb7b444-044d-41bb-945d-450c902337ff-StreamThread-3”org.apache.kafka.streams.errors.StreamsException:流线程[pcs-7bb7

  • 我用Flink的table API创建了一个表。 当运行SQL以查看记录时,我得到: 我知道有一些坏的avro记录被推送到Kafka主题中。在JSON格式中,有一个选项可以通过设置来跳过/过滤这些记录。当从合流avro格式读取时,我们可以跳过这些记录吗? 这并不理想,但不幸的是,尽管有一个模式注册表,但我无法控制要推送到Kafka的内容。

  • 我第一次使用pyspark。Spark版本:2.3.0Kafka版本:2.2.0 我有一个Kafka制作人,它以avro格式发送嵌套数据,我正试图在pyspark中编写spark流/结构化流的代码,它将来自Kafka的avro反序列化为数据帧,并进行转换,将其以拼花格式写入s3。我在spark/scala中找到了avro转换器,但pyspark中的支持尚未添加。如何在pyspark中转换相同的值。

  • 我开发了storm拓扑结构,以从hortonworks上的kafka代理接收JSONArray数据, 我不知道为什么我的kafkaSpout不使用HDP中来自Kafka Brokers的消息,但是Storm拓扑已成功提交,但当我可视化拓扑时:0%的数据已被消耗!! 拓扑可视化 这是我的计划类: 和属性文件: 当我使用另一个消费者时,存储在Kafka Brokers中的数据如下: 所以我的问题是为什

  • 线程名称:线程组1-1示例开始时间:2019-09-11 18:52:42英国夏令时加载时间:0连接时间:0延迟时间:0大小以字节为单位:0发送字节:0头大小以字节为单位:0主体大小以字节为单位:0示例计数:1错误计数:1数据类型(“text”“bin”“”):文本响应代码:000响应消息:javax.naming.nameNotFoundException:DynamicQueue/MyQueu