我正在使用Apache Beam的kafkaIO阅读一个主题,该主题在Confluent schema Registry中有一个avro模式。我可以反序列化消息并写入文件。但最终我想写给BigQuery。我的管道无法推断架构。我如何提取/推断模式并将其附加到管道中的数据,以便我的下游进程(写入BigQuery)能够推断模式?
下面是我使用模式注册表url设置反序列化器的代码,以及我从Kafka读到的代码:
consumerConfig.put(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
options.getSchemaRegistryUrl());
String schemaUrl = options.getSchemaRegistryUrl().get();
String subj = options.getSubject().get();
ConfluentSchemaRegistryDeserializerProvider<GenericRecord> valDeserializerProvider =
ConfluentSchemaRegistryDeserializerProvider.of(schemaUrl, subj);
pipeline
.apply("Read from Kafka",
KafkaIO
.<byte[], GenericRecord>read()
.withBootstrapServers(options.getKafkaBrokers().get())
.withTopics(Utils.getListFromString(options.getKafkaTopics()))
.withConsumerConfigUpdates(consumerConfig)
.withValueDeserializer(valDeserializerProvider)
.withKeyDeserializer(ByteArrayDeserializer.class)
.commitOffsetsInFinalize()
.withoutMetadata()
);
我最初认为这足以让beam推断模式,但它并不是,因为hasSchema()返回false。
如有任何帮助,我们将不胜感激。
这段代码可能会工作,但我还没有测试。
// Fetch Avro schema from CSR
SchemaRegistryClient registryClient = new CachedSchemaRegistryClient("schema_registry_url", 10);
SchemaMetadata latestSchemaMetadata = registryClient.getLatestSchemaMetadata("schema_name");
Schema avroSchema = new Schema.Parser().parse(latestSchemaMetadata.getSchema());
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
// Create KafkaIO.Read with Avro schema deserializer
KafkaIO.Read<String, GenericRecord> read = KafkaIO.<String, GenericRecord>read()
.withBootstrapServers("host:port")
.withTopic("topic_name")
.withConsumerConfigUpdates(ImmutableMap.of("schema.registry.url", schemaRegistryUrl))
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class, AvroCoder.of(avroSchema));
// Apply Kafka.Read and set Beam schema based on Avro Schema
p.apply(read)
.apply(Values.<GenericRecord>create()).setSchema(schema,
AvroUtils.getToRowFunction(GenericRecord.class, avroSchema),
AvroUtils.getFromRowFunction(GenericRecord.class))
那么我认为您可以将bigQueryIO.write
与useBeamSchema()
一起使用。
目前正在开展工作,以支持在kafkaio
中推断存储在汇流模式注册表中的Avro模式。不过,现在也可以在用户管道代码中这样做。
在Flink中有没有任何方法可以自动推断出Kafka主题DDL,而不需要手动查询,就像Spark中的情况一样。
如果Kafka主题的Avro模式被用作另一个模式的参考,那么更新该模式的正确方法是什么? 例如,假设我们有两个Kafka主题:一个使用Avro模式用户,另一个使用UserAction。 然后我想给用户添加一个额外的字段——一个“姓氏”,所以它看起来像这样:,空以使此更改兼容。要做到这一点,我可以更改Avro模式文件,使用Maven模式插件重新生成POJO,然后如果我将使用KafkaTemplate
我们需要从Kafka主题导出生产数据以用于测试目的:数据用Avro编写,模式放在模式注册表中。 我们尝试了以下策略: 使用和或。我们无法获得可以用Java解析的文件:解析时总是出现异常,这表明文件格式错误。 使用:它生成一个还包括一些字节的json,例如在反序列化BigDecimal时。我们甚至不知道要选择哪个解析选项(不是avro,也不是json) null 使用Kafka连接接收器。我们没有找
距今已过去数小时,话题仍未删除。 我看到了一些建议,建议我将放在我的中,然后重新启动Kafka。我试过这个。没奏效。 (为什么默认不设置这个?) 我可以关闭kafka和zookeeper,运行,然后再次启动zookeeper和kafka。但这是相当激烈的。确实应该有一些方法来说服实际上删除一个主题?
我们有一个传入的kafka主题,多个基于Avro模式的消息序列化到其中。 我们需要将Avro格式的消息拆分为多个其他kafka主题,基于某个公共模式属性的值。 想了解如何实现它,同时避免在汇流平台上构建中间客户端来进行这种拆分/路由。
我们执行以下步骤以删除主题-hgpo.llo.prmt.processed 但即使在12小时后,主题文件夹仍未从/var/kafka/kafka-logs中删除 注意-我们set-delete.topic.enable=true 在/var/kafka/kafka-logs下,我们有许多主题文件夹,如: ..