我正在实现一个生成Kafka消息的过程,每个消息都应该有由模式注册表验证的模式。为了开发,我使用docker运行kafka和模式注册表,我的模式由模式注册表ui进行注册。
看起来我的模式没有被验证或者我缺少一些配置。我的生产者类有以下代码:
package br.com.xx.realtime_transformation.producers;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.UUID;
public class KafkaEventProducer {
private final String bootstrapServers;
private final String schemaRegistryUrl;
private final KafkaProducer<String, GenericRecord> kafkaProducer;
public KafkaEventProducer(String bootstrapServers, String schemaRegistryUrl) {
this.bootstrapServers = bootstrapServers;
this.schemaRegistryUrl = schemaRegistryUrl;
this.kafkaProducer = getProducer();
}
private KafkaProducer<String, GenericRecord> getProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put(AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS, false);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.schemaRegistryUrl);
return new KafkaProducer<>(props);
}
public void send(String topic, GenericRecord event) {
try {
String key = UUID.randomUUID().toString();
final ProducerRecord producerRecord = new ProducerRecord<>(topic, key, event);
this.kafkaProducer.send(producerRecord);
} catch (final SerializationException e) {
e.printStackTrace();
}
}
}
大多数时候,我会收到一个错误,如“Schema not found”,当没有引发此异常时,我的消息不会被验证,它只是将消息发送到另一个主题。
是否缺少任何类型的配置?
我不确定我是否理解这个问题,但您似乎在询问模式验证。
这将在5.4版本中处理
Kafka Producer使用模式注册表,而不使用模式注册表。
没有模式注册表-以下是没有模式注册表的Kafka生产者示例
https://www.devglan.com/apache-kafka/apache-kafka-java-example
使用架构注册表
schema.registry.url
参数只是指向我们存储模式的位置。我们需要提供模式定义,如下所示。您没有提供模式定义,而是使用模式注册表,因此您遇到了问题。
{
"namespace": "com.example",
"type": "record",
"name": "Employee",
"doc" : "Represents an Employee at a company",
"fields": [
{"name": "firstName", "type": "string", "doc": "The persons given name"},
{"name": "lastName", "type": "string"},
{"name": "age", "type": "int", "default": -1},
{"name": "emails", "default":[], "type":{"type": "array", "items": "string"}},
{"name": "phoneNumber", "type": "string"}
]
}
Schema Registry的工作流程如下所示。
您可以在以下URL上找到详细的解释。https://mapr.com/docs/61/Kafka/KafkaSchemaRegistry/KafkaSchemaRegistryDemo.html https://aseigneurin.github.io/2018/08/02/kafka-tutorial-4-avro-and-schema-registry.html https://www.confluent.jp/blog/kafka-connect-tutorial-transfer-avro-schemas-across-schema-registry-clusters/
我对confluent Kafka不熟悉,能够与Avro一起经营confluent Kafka及其制作人和消费者。在本指南的帮助下,我已使用这些命令注册了我的新架构: 以下是我的注册表python代码: 一切正常,我的制作人发送avro数据,我正在使用C#在我的客户机上接收数据。但现在我想用基本身份验证凭据实现模式注册表。出于这个原因,我在google上找到了这个链接,这让我很困惑。例如,它声明使
我有一个Kafka主题,我想用AVRO数据(目前是JSON)来填充它。我知道“正确”的方法是使用模式注册表,但出于测试目的,我想让它在没有它的情况下工作。 因此,我将AVRO数据作为数组[字节]发送,而不是常规的Json对象: 模式是在每个数据中启动的;我如何使它与kafka-connect一起工作?kafka-connect配置目前表现出以下属性(数据作为json.gz文件写入s3),我想编写P
我试图用函数编程(和spring cloud stream)转换来自输入主题的输入AVRO消息,并在输出主题上发布新消息。下面是我的转换函数: 我的spring boot应用程序是以这种方式声明的,并激活了模式注册表客户机: 谢谢你能给我带来的任何帮助。 视CG
我正试图使用SpringKafka为我的生产者应用程序及其嵌入式Kafka服务器编写测试。 然而,我的应用程序也使用合流模式注册表,我想知道SpringKafka是否为模式注册表提供了一些嵌入式服务器? 或者有没有更好的方法来使用模式注册表进行Spring Kafka测试?
我试图从REST代理发布json模式,但遇到异常 curl-k-x post-h“content-type:application/vnd.schemaregistry.v1+json”--数据“{”schema“:”{“type”:“object”,“properties”:{“firstname”:{“type”:“string”},“lastname”:{“type”:“string”},“
我有麻烦处理错误在我的注册表格特别是与密码1 以下是我的看法。py: 最后是模板文件: 一切正常,我可以在用户注册时对他们进行身份验证,登录并重定向他们,但如果注册表单中存在错误,那么我无法让用户了解哪个字段负责并在提交之前进行更正。 你知道如何确保显示密码吗