当前位置: 首页 > 知识库问答 >
问题:

使用模式注册表API验证Kafka消息

桓信鸥
2023-03-14

我正在实现一个生成Kafka消息的过程,每个消息都应该有由模式注册表验证的模式。为了开发,我使用docker运行kafka和模式注册表,我的模式由模式注册表ui进行注册。

看起来我的模式没有被验证或者我缺少一些配置。我的生产者类有以下代码:

package br.com.xx.realtime_transformation.producers;

import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;

import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.UUID;

public class KafkaEventProducer {

    private final String bootstrapServers;
    private final String schemaRegistryUrl;
    private final KafkaProducer<String, GenericRecord> kafkaProducer;

    public KafkaEventProducer(String bootstrapServers, String schemaRegistryUrl) {
        this.bootstrapServers = bootstrapServers;
        this.schemaRegistryUrl = schemaRegistryUrl;
        this.kafkaProducer = getProducer();
    }

    private KafkaProducer<String, GenericRecord> getProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        props.put(AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS, false);
        props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.schemaRegistryUrl);

        return new KafkaProducer<>(props);
    }

    public void send(String topic, GenericRecord event) {
        try {
            String key = UUID.randomUUID().toString();
            final ProducerRecord producerRecord = new ProducerRecord<>(topic, key, event);

            this.kafkaProducer.send(producerRecord);
        } catch (final SerializationException e) {
            e.printStackTrace();
        }

    }

}

大多数时候,我会收到一个错误,如“Schema not found”,当没有引发此异常时,我的消息不会被验证,它只是将消息发送到另一个主题。

是否缺少任何类型的配置?

共有2个答案

谭飞掣
2023-03-14

我不确定我是否理解这个问题,但您似乎在询问模式验证。

这将在5.4版本中处理

诸葛卜霸
2023-03-14

Kafka Producer使用模式注册表,而不使用模式注册表。

没有模式注册表-以下是没有模式注册表的Kafka生产者示例

https://www.devglan.com/apache-kafka/apache-kafka-java-example

使用架构注册表

schema.registry.url参数只是指向我们存储模式的位置。我们需要提供模式定义,如下所示。您没有提供模式定义,而是使用模式注册表,因此您遇到了问题。

{
  "namespace": "com.example",
  "type": "record",
  "name": "Employee",
  "doc" : "Represents an Employee at a company",
  "fields": [
      {"name": "firstName", "type": "string", "doc": "The persons given name"},
      {"name": "lastName", "type": "string"},
      {"name": "age",  "type": "int", "default": -1},
      {"name": "emails", "default":[], "type":{"type": "array", "items": "string"}},
      {"name": "phoneNumber",  "type": "string"}
   ]
} 

Schema Registry的工作流程如下所示。

您可以在以下URL上找到详细的解释。https://mapr.com/docs/61/Kafka/KafkaSchemaRegistry/KafkaSchemaRegistryDemo.html https://aseigneurin.github.io/2018/08/02/kafka-tutorial-4-avro-and-schema-registry.html https://www.confluent.jp/blog/kafka-connect-tutorial-transfer-avro-schemas-across-schema-registry-clusters/

 类似资料:
  • 我对confluent Kafka不熟悉,能够与Avro一起经营confluent Kafka及其制作人和消费者。在本指南的帮助下,我已使用这些命令注册了我的新架构: 以下是我的注册表python代码: 一切正常,我的制作人发送avro数据,我正在使用C#在我的客户机上接收数据。但现在我想用基本身份验证凭据实现模式注册表。出于这个原因,我在google上找到了这个链接,这让我很困惑。例如,它声明使

  • 我有一个Kafka主题,我想用AVRO数据(目前是JSON)来填充它。我知道“正确”的方法是使用模式注册表,但出于测试目的,我想让它在没有它的情况下工作。 因此,我将AVRO数据作为数组[字节]发送,而不是常规的Json对象: 模式是在每个数据中启动的;我如何使它与kafka-connect一起工作?kafka-connect配置目前表现出以下属性(数据作为json.gz文件写入s3),我想编写P

  • 我试图用函数编程(和spring cloud stream)转换来自输入主题的输入AVRO消息,并在输出主题上发布新消息。下面是我的转换函数: 我的spring boot应用程序是以这种方式声明的,并激活了模式注册表客户机: 谢谢你能给我带来的任何帮助。 视CG

  • 我正试图使用SpringKafka为我的生产者应用程序及其嵌入式Kafka服务器编写测试。 然而,我的应用程序也使用合流模式注册表,我想知道SpringKafka是否为模式注册表提供了一些嵌入式服务器? 或者有没有更好的方法来使用模式注册表进行Spring Kafka测试?

  • 我试图从REST代理发布json模式,但遇到异常 curl-k-x post-h“content-type:application/vnd.schemaregistry.v1+json”--数据“{”schema“:”{“type”:“object”,“properties”:{“firstname”:{“type”:“string”},“lastname”:{“type”:“string”},“

  • 我有麻烦处理错误在我的注册表格特别是与密码1 以下是我的看法。py: 最后是模板文件: 一切正常,我可以在用户注册时对他们进行身份验证,登录并重定向他们,但如果注册表单中存在错误,那么我无法让用户了解哪个字段负责并在提交之前进行更正。 你知道如何确保显示密码吗