我有一个建立在Kafka之上的事件源应用程序。目前,我有一个主题中有多个消息类型。所有序列化/反序列化的JSON。
那么这种方法如何与Kafka流媒体应用程序一起工作呢?在该应用程序中,您需要指定一个键和值serde?
我是不是应该忘了Avro而改用protobuff呢?
这是从发布不同类型事件的主题获取数据的consumer示例:
package com.kafka.schema;
import com.phonebook.Employee;
import com.phonebook.Milestone;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.Properties;
import java.util.stream.IntStream;
public class AvroConsumer {
private static Consumer<Long, GenericRecord> createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Const.BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleAvroConsumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
// Use Kafka Avro Deserializer.
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Use Specific Record or else you get Avro GenericRecord.
// props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
// Schema registry location.
// Run Schema Registry on 8081
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, Const.SCHEMA_REGISTRY);
props.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());
return new KafkaConsumer<>(props);
}
public static void main(String... args) {
final Consumer<Long, GenericRecord> consumer = createConsumer();
consumer.subscribe(Collections.singletonList(Const.TOPIC));
IntStream.range(1, 100).forEach(index -> {
final ConsumerRecords<Long, GenericRecord> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
if (records.count() == 0) {
System.out.println("None found");
} else {
records.forEach(record -> {
GenericRecord recValue = record.value();
System.out.printf("%s %d %d %s \n", record.topic(), record.partition(), record.offset(), recValue);
});
}
});
}
}
这里重要的部分是:
props.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());
假设我有一个名为的Kafka主题,它有几个消息类型(每个消息类型都有不同的Avro模式),如、等等。我想了解一下用Spring Cloud Stream发布/接收相同主题的不同类型是否可行(而且有意义)。特别是,拥有几个将非常有用,每个专用于特定类型。根据这篇博文,当需要订购消息时,这是非常有用的,因为它们与同一个实体相关。这种情况下的配置示例是什么?
我有多个制作人,可以向一个Kafka主题发送多种类型的事件。 我有一个消费者,它必须消费所有类型的消息。每种类型的消息都有不同的逻辑。 但在这种情况下,所有消息都指向此方法,不仅是EventOne 如果我实现了两种方法(对于每种类型的消息),那么所有消息都只能使用一种方法。 如果我像这样实现监听器: 然后我得到一个例外:org。springframework。Kafka。KafkaListener
我正在尝试以avro格式对kafka消息进行解密我使用了以下代码:https://github.com/ivangfr/springboot-kafka-debezium-ksql/blob/master/kafka-research-consumer/src/main/java/com/mycompany/kafkaResearchconsumer/kafka/reviewsconsumerco
我们有一个传入的kafka主题,多个基于Avro模式的消息序列化到其中。 我们需要将Avro格式的消息拆分为多个其他kafka主题,基于某个公共模式属性的值。 想了解如何实现它,同时避免在汇流平台上构建中间客户端来进行这种拆分/路由。
Debezium连接器的Kafka connect事件是Avro编码的。 在传递给Kafka connect standalone服务的connect-standalone.properties中提到了以下内容。 使用这些属性配置Kafka使用者代码: 在消费者实现中,下面是读取键和值组件的代码。我使用REST从模式注册表中获取键和值的模式。 解析密钥工作正常。在解析消息的值部分时,我得到了Arr
我想使用spring-Kafka库使用spring boot配置的消费者来使用来自Kafka代理的消息,源是一个JDBC连接器,它负责从MySQL数据库提取消息,这些消息需要被使用 下面是我的application.yml文件