当前位置: 首页 > 知识库问答 >
问题:

Spring Kafka,Spring Cloud Stream,和Avro兼容性未知魔术字节

荆利
2023-03-14

我有一个问题反序列化来自Kafka主题的消息。这些消息已经使用spring-cloud-stream和Apache Avro序列化。我正在用斯普林斯·Kafka阅读它们,并试图反序列化它们。如果我使用spring-cloud来生成和使用消息,那么我就可以很好地反序列化消息。问题是当我用Spring Kafka消费它们,然后试图反序列化。

我正在使用一个模式注册表(用于开发的spring-boot模式注册表和用于生产的Confluent模式),但是反序列化问题似乎发生在调用模式注册表的事件之前。

很难发布关于这个问题的所有相关代码,所以我将其发布在git Hub的一个repo中:https://github.com/robjwilkins/avro-example

@Data
public class Request {
  private String message;
}

在Kafka上生成消息的代码如下所示:

@EnableBinding(MessageChannels.class)
@Slf4j
@RequiredArgsConstructor
@RestController
public class ProducerController {

  private final MessageChannels messageChannels;

  @GetMapping("/produce")
  public void produceMessage() {
    Request request = new Request();
    request.setMessage("hello world");
    Message<Request> requestMessage = MessageBuilder.withPayload(request).build();
    log.debug("sending message");
    messageChannels.testRequest().send(requestMessage);
  }
}

和application.yaml:

spring:
  application.name: avro-producer
  kafka:
    bootstrap-servers: localhost:9092
    consumer.group-id: avro-producer
  cloud:
    stream:
      schema-registry-client.endpoint: http://localhost:8071
      schema.avro.dynamic-schema-generation-enabled: true
      kafka:
        binder:
          brokers: ${spring.kafka.bootstrap-servers}
      bindings:
        test-request:
          destination: test-request
          contentType: application/*+avro

然后我有一个消费者:

@Slf4j
@Component
public class TopicListener {

    @KafkaListener(topics = {"test-request"})
    public void listenForMessage(ConsumerRecord<String, Request> consumerRecord) {
        log.info("listenForMessage. got a message: {}", consumerRecord);
        consumerRecord.headers().forEach(header -> log.info("header. key: {}, value: {}", header.key(), asString(header.value())));
    }

    private String asString(byte[] byteArray) {
        return new String(byteArray, Charset.defaultCharset());
    }
}
spring:
  application.name: avro-consumer
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: avro-consumer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
#      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        schema.registry.url: http://localhost:8071
2019-01-30 20:01:39.900 ERROR 30876 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: null

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition test-request-0 at offset 43. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
public abstract class AbstractKafkaAvroDeserializer extends AbstractKafkaAvroSerDe {
....
private ByteBuffer getByteBuffer(byte[] payload) {
  ByteBuffer buffer = ByteBuffer.wrap(payload);
  if (buffer.get() != 0) {
    throw new SerializationException("Unknown magic byte!");
  } else {
    return buffer;
  }
}

共有1个答案

云洋
2023-03-14

这个问题的症结在于生产者使用spring-cloud-stream向Kafka发布消息,而消费者使用Spring-Kaka。这样做的原因是:

  • 现有系统已经建立良好,并使用Spring-Cloud-Stream
  • 新使用者需要使用相同的方法监听多个主题,并且只绑定在主题名称的csv列表上
  • 需要一次使用一组消息,而不是单独使用,这样可以将其内容批量写入数据库。

Spring-cloud-stream目前不允许使用者将侦听器绑定到多个主题,并且没有办法同时使用消息集合(除非我弄错了)。

我找到了一个解决方案,它不需要对生产者代码进行任何更改,它使用spring-cloud-stream向Kafka发布消息。Spring-cloud-stream使用MessageConverter来管理序列化和反序列化。在AbstractAvroMessageConverter中,有一些方法:ConvertFromInternalConvertToInternal,它们处理到字节数组的转换。我的解决方案是扩展这段代码(创建一个扩展AvroschemaRegistryClientMessageConverter)的类,这样我就可以重用spring-cloud-stream的大部分功能,但是使用一个可以从spring-kafkaKafkalistener访问的接口。然后我修改了我的TopicListener,使用这个类进行转换:

转换器:

@Component
@Slf4j
public class AvroKafkaMessageConverter extends AvroSchemaRegistryClientMessageConverter {

  public AvroKafkaMessageConverter(SchemaRegistryClient schemaRegistryClient) {
    super(schemaRegistryClient, new NoOpCacheManager());
  }

  public <T> T convertFromInternal(ConsumerRecord<?, ?> consumerRecord, Class<T> targetClass,
      Object conversionHint) {
    T result;
    try {
      byte[] payload = (byte[]) consumerRecord.value();

      Map<String, String> headers = new HashMap<>();
      consumerRecord.headers().forEach(header -> headers.put(header.key(), asString(header.value())));

      MimeType mimeType = messageMimeType(conversionHint, headers);
      if (mimeType == null) {
        return null;
      }

      Schema writerSchema = resolveWriterSchemaForDeserialization(mimeType);
      Schema readerSchema = resolveReaderSchemaForDeserialization(targetClass);

      @SuppressWarnings("unchecked")
      DatumReader<Object> reader = getDatumReader((Class<Object>) targetClass, readerSchema, writerSchema);
      Decoder decoder = DecoderFactory.get().binaryDecoder(payload, null);
      result = (T) reader.read(null, decoder);
    }
    catch (IOException e) {
      throw new RuntimeException("Failed to read payload", e);
    }
    return result;
  }

  private MimeType messageMimeType(Object conversionHint, Map<String, String> headers) {
    MimeType mimeType;
    try {
      String contentType = headers.get(MessageHeaders.CONTENT_TYPE);
      log.debug("contentType: {}", contentType);
      mimeType = MimeType.valueOf(contentType);
    } catch (InvalidMimeTypeException e) {
      log.error("Exception getting object MimeType from contentType header", e);
      if (conversionHint instanceof MimeType) {
        mimeType = (MimeType) conversionHint;
      }
      else {
        return null;
      }
    }
    return mimeType;
  }

  private String asString(byte[] byteArray) {
    String theString = new String(byteArray, Charset.defaultCharset());
    return theString.replace("\"", "");
  }
}

修改后的TopicListener:

@Slf4j
@Component
@RequiredArgsConstructor
public class TopicListener {

  private final AvroKafkaMessageConverter messageConverter;

  @KafkaListener(topics = {"test-request"})
  public void listenForMessage(ConsumerRecord<?, ?> consumerRecord) {
    log.info("listenForMessage. got a message: {}", consumerRecord);
    Request request = messageConverter.convertFromInternal(
        consumerRecord, Request.class, MimeType.valueOf("application/vnd.*+avr"));
    log.info("request message: {}", request.getMessage());
  }
}
 类似资料:
  • 我一直在尝试将kafka-avro-console-consumer从Confluent连接到我们的遗留Kafka集群,该集群是在没有Confluent模式注册表的情况下部署的。我使用以下属性显式提供了模式: 但我得到的是‘未知的魔法字节!’出错 是否可以使用Confluent kafka-avro-console-consumer(未使用Confluent的AvroSerializer序列化)和

  • 我正在使用带有Spring启动 2.5.8 的 Spring Cloud 2020.0.2 和 spring-kafka:2.6.6。这工作正常。但是当我将Spring云升级到 2020.0.4 时,Spring kafka 会抛出以下错误。 邮件处理异常,缺少标头kafka_acknowledgement。 有谁能帮我一下spring cloud 2020.0.2和spring-kafka:2.

  • 问题内容: 我在服务器上编译我的代码并下载并尝试在我的计算机上运行它时遇到了一个奇怪的错误。 我基本上是在EC2实例上编译一些Java文件,然后将它们加载到存储中供以后使用。 当我将文件下载到计算机上并尝试运行它们时,出现以下错误: 我正在使用以下方法编译文件: 并使用此方法上传文件: 有人知道我在做什么错吗?当我在计算机上编译这些类文件时,这些类文件是可运行的,但是当我将它们上传到云中并下载它们

  • 我正在尝试使用镜头从MQTT到Kafka的消息。ioReactor。流式反应器的最新版本 Kafka/融合版 预期行为:avro主题应打印在控制台上 org.apache.kafka.common.errors。序列化异常:未知的魔法字节! 附加细节 连接器属性配置(我的连接器属性) avro json文件 MQTT代理消息 完整日志 还在模式注册表上注册了模式 我会做错什么?

  • 我们的系统由多个微服务组成,这些微服务发出并使用以avro格式编码的事件(参见底部的模式)。一个特定的用例如下:服务A在主题T1上发出一个事件(类型为InvoiceEvents),服务B和C(不同的开发团队)在T1上消费。例如,服务B是税务团队的一部分,而服务C是产品履行团队的一部分。 我本以为以下是真的(但似乎不是真的): 通过添加新的联合类型(即为字段“payload”创建的InvoiceCr

  • BigQuery通常在加载Avro数据方面做得很好,但“bq load”在时间戳和其他使用Avro logicalType属性的日期/时间字段方面有很多问题。 当BigQuery时间戳将Avro类型时间戳millis的数据解释为微秒时间戳(关闭1000)时,我的数据被破坏 我想我可以通过始终将数据加载到临时字段并使用CAST查询或将它们转换为其他字段来解决这些问题,但这并不能很好地扩展或支持模式演