当前位置: 首页 > 知识库问答 >
问题:

如何使用Spring Kafka通过融合模式注册表读取AVRO消息?

商宝
2023-03-14

如何使用Spring Kafka通过合流模式注册表读取AVRO消息?有样品吗?我在官方参考文件中找不到它。

共有2个答案

茹高义
2023-03-14

如果使用spring kafka或用于kafka的本机java客户端,则情况不变。假设您想使用某个主题中的消息,并且关键字和值是avro记录,则可以将以下属性添加到consumerproperties中。

consumerProperties.put("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
consumerProperties.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
consumerProperties.put("schema.registry.url", KAFKA_SCHEMA_REGISTRY_URL);

如果您正在使用主题中的特定记录,则还需要添加以下行

consumerProperties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
陆博易
2023-03-14

下面的代码可以读取来自client-avro主题的消息。这是我定义为的值的AVRO模式。

{
     "type": "record",
     "namespace": "com.example",
     "name": "Customer",
     "version": "1",
     "fields": [
       { "name": "first_name", "type": "string", "doc": "First Name of Customer" },
       { "name": "last_name", "type": "string", "doc": "Last Name of Customer" },
       { "name": "age", "type": "int", "doc": "Age at the time of registration" },
       { "name": "height", "type": "float", "doc": "Height at the time of registration in cm" },
       { "name": "weight", "type": "float", "doc": "Weight at the time of registration in kg" },
       { "name": "automated_email", "type": "boolean", "default": true, "doc": "Field indicating if the user is enrolled in marketing emails" }
     ]
}

下面是使用手动提交阅读此示例的完整代码段。

import com.example.Customer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Calendar;
import java.util.Collections;
import java.util.Properties;

public class KafkaAvroJavaConsumerV1Demo {

    public static void main(String[] args) {
        Properties properties = new Properties();
        // normal consumer
        properties.setProperty("bootstrap.servers","127.0.0.1:9092");
        properties.put("group.id", "customer-consumer-group-v1");
        properties.put("auto.commit.enable", "false");
        properties.put("auto.offset.reset", "earliest");

        // avro part (deserializer)
        properties.setProperty("key.deserializer", StringDeserializer.class.getName());
        properties.setProperty("value.deserializer", KafkaAvroDeserializer.class.getName());
        properties.setProperty("schema.registry.url", "http://127.0.0.1:8081");
        properties.setProperty("specific.avro.reader", "true");

        KafkaConsumer<String, Customer> kafkaConsumer = new KafkaConsumer<>(properties);
        String topic = "customer-avro";
        kafkaConsumer.subscribe(Collections.singleton(topic));

        System.out.println("Waiting for data...");

        while (true){
            System.out.println("Polling at " + Calendar.getInstance().getTime().toString());
            ConsumerRecords<String, Customer> records = kafkaConsumer.poll(1000);

            for (ConsumerRecord<String, Customer> record : records){
                Customer customer = record.value();
                System.out.println(customer);
            }

            kafkaConsumer.commitSync();
        }
    }
}
 类似资料:
  • 我正在尝试使用Confluent schema registry,下面是我在Github中找到的一些示例(https://github.com/gAmUssA/springboot-kafka-avro). 当消费者和生产者与模型共享相同的命名空间而不是其工作时。 当使用者位于具有不同名称空间但具有相同类(名称和属性方面)的不同项目中时,它不工作。 合流Avro反序列化程序可以使用正确的值反序列化

  • 我现在一直在查看Spring Cloud模式注册表和汇合模式注册表。我可以看到一些区别,例如Spring Cloud模式注册表将模式保存在普通数据库中,默认情况下保存在h2中,而汇合模式注册表保存在kafka主题中。 spring云模式注册表的这种方法是否会对性能产生任何影响。据我所知,即使数据保留在主题上,以防汇合,查询它时仍然会有延迟。但会有重大影响吗? 我还可以看到,spring云模式注册表

  • 我使用来自Confluent的Kafka Connect来使用Kafka流并以拼花格式写入HDFS。我正在1个节点中使用架构注册表服务,它运行良好。现在我想将模式注册表分发到集群模式以处理故障转移。关于如何实现这一点的任何链接或片段都将非常有用。

  • 我正在与Avro/Kafka和Confluent的Avro模式注册中心合作。 我使用avsc文件和avdl制作了一些基本模式和具有基本类型的主题。 我正在查看Confluent制作的API文档,以尝试将模式发展到版本2。特别是本部分: https://docs.confluent.io/current/schema-registry/using.html#register-a-new-versio

  • 我正在尝试设置一个Beam管道,以便使用python API读取Kafka的内容。我能够设置消费者配置和主题。如何更新管道以使用合流模式注册表并定义Avro消息值反序列化器?