当前位置: 首页 > 知识库问答 >
问题:

无法使用kafka avro控制台使用者读取avro消息。SerializationException:未知的魔法字节

程举
2023-03-14

我正在编写一个REST代理,就像合流REST代理一样。它接受JSON负载、模式主题和id,然后将JSON负载作为Avro对象写入流中。当我使用kafka avro控制台消费者阅读消息时,我收到了“未知魔法字节”错误。

这是我的Kafka制作人配置:

        properties.put("client.id", LocalHostUtils.getLocalHostName(null));

        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);

        properties.put(AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS, false);
        properties.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, RecordNameStrategy.class);

        properties.put("schema.registry.url", configValuesManager.getString("dsp_kafka.schema_registry"));

        if (KafkaUtils.isKafkaEnabled()) {
            this.kafkaProducer = new KafkaProducer<String, Object>(properties);
        }

这就是REST控制器如何将传入的JSON转换为Avro

        Schema schema = null;
        try {
            schema = schemaRegistryClient.getBySubjectAndID(schemaSubject, schemaId);
        } catch (RestClientException e) {
            throw new IOExceptionWithCause(e);
        }

        log.debug(postContent);
        log.info("Subject/Version {}/{} -> {}", schemaSubject, schemaId, schema);
        Object data = toAvro(schema, postContent);

这是toAvro方法的实现:

    Object toAvro(Schema schema, String jsonBody) throws IOException
    {
        DatumReader<Object> reader = new GenericDatumReader<Object>(schema);
        Object object = reader.read(
                null, decoderFactory.jsonDecoder(schema, jsonBody));

        return object;

    }

然后将此对象传递给我使用上面给出的属性配置的SchemaValidationProducer。。。。

        this.kafkaSchemaValidatingProducer.publish(topic, 0, UUID.randomUUID().toString(), data);

这是kafkaSchemaValidatingProducer上的发布方法

    public void publish(String topic, Integer partition, String key, Object data)
    {
        log.debug("publish topic={} key={} value={}", topic, key, data);

        if (!KafkaUtils.isKafkaEnabled()) {
            log.warn("Kafka is not enabled....");
            return;
        }

        ProducerRecord<String, Object> record = new ProducerRecord<String, Object>(topic, key, data);


        Future<RecordMetadata> metadataFuture = kafkaProducer.send(record, new Callback()
        {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception)
            {
                if (exception == null) {
                    log.info(metadata.toString());
                    return;
                }

                log.error("exception", exception);

            }
        });

        kafkaProducer.flush();

    }

我就是这样读这个话题的

./bin/kafka-avro-console-consumer --bootstrap-server kafka-broker1:9021 --consumer.config client-ssl.properties --topic schema-validated-topic --property print.key=true --property print.value=true --value-deserializer io.confluent.kafka.serializers.KafkaAvroDeserializer --offset earliest --skip-message-on-error --partition 0 --property schema.registry.url http://schema-regisry

这将导致。。。。

[2019-08-26 16:30:36,351] ERROR Error processing message, skipping this message:  (kafka.tools.ConsoleConsumer$:76)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

知道为什么我得到了“坏魔术数字错误”吗?

共有1个答案

红砚文
2023-03-14

我发现了问题所在。那就是我没有在命令中指定密钥反序列化器。

这是有效的命令。

./bin/kafka-avro-console-consumer \
--bootstrap-server <bootstrap-server> \
--consumer.config client-ssl.properties \
--property schema.registry.url=<schema-registry-url> \
--topic <name-of-topic> \
--property print.key=true \
--property print.value=true \
--value-deserializer io.confluent.kafka.serializers.KafkaAvroDeserializer \
--key-deserializer org.apache.kafka.common.serialization.StringDeserializer
 类似资料:
  • (最终目标)在尝试是否最终可以从Confluent平台读取avro数据usng spark stream之前,如这里所述:将spark结构化流与Confluent Schema Registry集成 我要验证是否可以使用以下命令来读取它们: 我收到这个错误消息,未知的魔法字节 注意,可以这样读取消息(使用console consumer而不是avro console consumer): 该消息是

  • 我试图从__consumer_offsets主题中使用,因为这似乎是检索关于消费者的kafka度量(如消息滞后等)的最简单的方法。理想的方法是从jmx访问它,但希望先尝试一下,返回的消息似乎是加密的或不可读的。尝试添加stringDeserializer属性。有没有人对如何纠正这一点有什么建议?这里的提法也是重复的 重复的consumer_offset 没有帮助,因为它没有引用我的问题,即在Jav

  • 有没有解决这个问题的方法???我无法读取KAFKA-AVRO架构消息。我正在尝试将消息从logstash发送到KAFKA到hdfs。 以下是技术堆栈: LogStash 2.3-当前生产版本 汇流3.0。 插件:A。Logstash-kafka-Output插件B。logstash-codec-avro。 动物园管理员:3.4.6 Kafka:0.10.0.0 Logstash配置文件如下所示:

  • 我们正在使用与Kafka消费者和生产者Spring。我们正在生成大小为905字节的消息。我们正在序列化消息,并试图为下一个使用者反序列化它。 消息有效负载类示例: Application.Properties 当我们接受字符串格式的消息负载时,Consumer工作得很好,但当我们将Consumer中的负载反序列化为对象时,我们面临着问题。下面的错误被抛出相同

  • 我试图使用apache kafka二进制文件中的kafka控制台生成器生成消息,并在spring boot中使用消费者设置。消费者使用avro模式。 当消息以json格式生成时,我的消费者抛出异常-“无法序列化”。 我找到了一个解决方案,可以使用“ConFluent Platform 7.1”,它有kafka-avro-console-生产者。它支持avro,但它是企业版。 有没有一种方法可以使用

  • java 我正在使用控制中心来检查这个主题的消费者,并跟踪正在消费的数据。在运行这个应用程序时,它与Kafka和所有分区都连接得很好,我可以在控制中心看到所有的数据都被提取了,但在我的java控制台中没有打印任何数据。但是我注意到,在向Kafka发送一些新数据时,它会在java控制台中打印出来(即,在运行我的消费者后将新数据发送给Kafka)。它应该是这样的吗?还是我做错了什么?根据我的理解,Ka