我正在使用Apache Flink中的Kafka连接器来访问合流Kafka提供的流。
除了模式注册表urlConfluentRegistryAvroDeserializationSchema之外。伪造的(…)
应为“reader”架构。我不想提供读取模式,而是想使用同一个编写器的模式(在注册表中查找)来读取消息,因为消费者不会有最新的模式。
FlinkKafkaConsumer010<GenericRecord> myConsumer =
new FlinkKafkaConsumer010<>("topic-name", ConfluentRegistryAvroDeserializationSchema.forGeneric(<reader schema goes here>, "http://host:port"), properties);
myConsumer.setStartFromLatest();
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html"使用这些反序列化模式记录将使用从模式注册表检索并转换为静态提供的模式读取"
既然我不想在消费者端保留模式定义,我该如何使用writer的模式反序列化来自Kafka的Avro消息?
感谢您的帮助!
我认为不可能直接使用ConFluentCORstryAvroDeseriazationSchema.forGeneric
。它旨在与读取器模式一起使用,并且它们对此有先决条件检查。
你必须实现你自己的。两个进口的东西:
specific。阿夫罗。reader
为false(否则你会得到具体的记录)import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
public class KafkaGenericAvroDeserializationSchema
implements KeyedDeserializationSchema<GenericRecord> {
private final String registryUrl;
private transient KafkaAvroDeserializer inner;
public KafkaGenericAvroDeserializationSchema(String registryUrl) {
this.registryUrl = registryUrl;
}
@Override
public GenericRecord deserialize(
byte[] messageKey, byte[] message, String topic, int partition, long offset) {
checkInitialized();
return (GenericRecord) inner.deserialize(topic, message);
}
@Override
public boolean isEndOfStream(GenericRecord nextElement) {
return false;
}
@Override
public TypeInformation<GenericRecord> getProducedType() {
return TypeExtractor.getForClass(GenericRecord.class);
}
private void checkInitialized() {
if (inner == null) {
Map<String, Object> props = new HashMap<>();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
SchemaRegistryClient client =
new CachedSchemaRegistryClient(
registryUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
inner = new KafkaAvroDeserializer(client, props);
}
}
}
env.addSource(
new FlinkKafkaConsumer<>(
topic,
new KafkaGenericAvroDeserializationSchema(schemaReigstryUrl),
kafkaProperties));
我试图遵循示例:https://blog.knoldus.com/a-quick-demo-kafka-to-flink-to-cassandra/我试图从kafka解析我的Shippingorder JSON消息并将其解析为对象。然后按一些属性对其进行分组,但在平面图步骤时出现错误。 我的sbt文件: 我的主文件。 我的订单对象 运行此作业时出错 我不知道这个错误。请解释并帮助我解决这个问题。
主要目标是聚合两个Kafka主题,一个压缩慢速移动数据,另一个每秒接收一次的快速移动数据。 我已经能够在简单的场景中使用消息,例如KV(Long, String),使用如下内容: 但是,当您需要从 AVRO 反序列化时,这似乎不是方法。我有一个KV(字符串,AVRO),我需要消费。 我尝试从AVRO模式生成Java类,然后将它们包含在“应用”中,例如: 但这似乎不是正确的方法。 是否有任何文档/示
null null
我正在使用kafka从源接收数据,我正在使用用< code>Node.js编写的消费者应用程序,并使用< code>kafka-node连接到kafka服务器。另一方面,生产者是用< code>Java编写的,他们使用一些kafka流库来产生带有模式的avro消息。我可以接收消息,但它们是avro序列化的,下面是我接收的序列化消息格式- 我正在尝试反序列化它,但无法使用 npm模块,因为avsc只
我试图阅读和打印从Kafka使用Apache Flink的原型消息。 我遵循官方文件,但没有成功:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/third_party_serializers/ Flink消费者代码是: 反序列化器代码是:
我们使用的是Spring kafka 2.7非阻塞重试机制。在Spring Kafka重试机制中,Kafka listenser使用来自main topic、Retry topic和DLT topic的消息,我们希望侦听器仅使用来自main和Retry topic的消息。 有没有简单的方法来进行设置? 因为我们不希望同一个消费者处理DLT消息。DLT还将被另一个进程使用,以发送请求通知。