我有一个生产者,它正在为一个主题生成protobuf消息。我有一个消费者应用程序,它反序列化protobuf消息。但hdfs接收器连接器直接从Kafka主题接收消息。etc/schema-registry/connect-avro-standalone.properties
中的键和值转换器将设置为什么?做这件事最好的方法是什么?提前道谢!
Kafka Connect的设计目的是将Kafka中串行化格式的关注与具有转换器概念的单个连接器分开。您似乎已经发现,需要将key.converter
和value.converter
类调整为支持protobuf的实现。这些类通常作为一个普通的Kafka反序列化程序实现,然后执行一个步骤,该步骤执行从特定于序列化的运行时格式(例如,protobufs中的消息)到Kafka Connect的运行时API(它没有任何关联的序列化格式--它只是一组Java类型和一个定义模式的类)的转换。
我不知道现有的实现。实现这一点的主要挑战是,protobufs是自描述的(即,您可以在不访问原始模式的情况下反序列化它),但是由于它的字段只是整数ID,如果不a)要求特定的模式对转换器可用,例如通过config(这使得迁移模式更加复杂),或者b)数据的模式注册服务+包装器格式,允许您动态查找模式,您可能就无法获得有用的模式信息。
我需要关于Kafka主题的帮助,我想将其放入拼花格式的HDFS中(与daily partitionner)。 我在Kafka主题中有很多数据,基本上都是json数据,如下所示: 本主题的名称为:测试 我想将这些数据以拼花格式放入我的HDFS集群中。但是我在接收器连接器配置方面遇到了困难。为此,我使用了融合的hdfs-shin-连接器。 以下是我迄今为止所做的工作: 关于为什么我这样配置连接器的一些
我参考了以下链接来了解Kafka的HDFS连接https://docs.confluent.io/2.0.0/Connect/connect-hdfs/docs/index.html 我能够通过配置单元集成将数据从Kafka导出到HDFS。 现在我正尝试在Java程序的帮助下将avro记录写入Kafka 当我把Avro记录写到Kafka主题时,我在Connect中出现以下错误
我正在尝试使用Kafka连接接收器将文件从Kafka写入HDFS。 我的属性看起来像: 有什么建议吗?
我有一个kafka主题,有200万条消息,我的刷新大小是100000,默认分区为分布式模式,有4个工作者,我可以看到数据在几秒钟内立即写入HDFS(10到15秒)。 我看到创建了一个+tmp目录和文件夹,并且每次触发一个新连接器时都会创建主题。 kafka connect的行为是每次都写得这么快,还是已经将数据存储在HDFS中,并根据连接器属性将其移动到主题目录? 我需要清楚这是怎么发生的。如果我
在JSON中从Kafka生产/消费。使用以下属性保存到JSON中的HDFS: 制作人: 谢谢
我想用Kafka HDFS接收器连接到Azure Blob存储。到目前为止,我已经做了: > 设置属性: 并在中添加了对WASB的支持: 你能帮我解决这个问题吗。有可能吗?