当前位置: 首页 > 知识库问答 >
问题:

Kafka将HDFS接收器与Azure Blob存储连接起来

燕刚捷
2023-03-14

我想用Kafka HDFS接收器连接到Azure Blob存储。到目前为止,我已经做了:

>

  • 设置kafka-connect属性:

    hdfs.url=wasbs://<my_url>
    hadoop.conf.dir={hadoop_3_home}/etc/hadoop/
    hadoop.home={hadoop_3_home}
    

    并在core-site.xml中添加了对WASB的支持:

    <property>
        <name>fs.wasbs.impl</name>
        <value>org.apache.hadoop.fs.azure.NativeAzureFileSystem</value>
    </property>
    
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
     at io.confluent.connect.hdfs.storage.StorageFactory.createStorage(StorageFactory.java:29)
     ... 11 more
    Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.azure.NativeAzureFileSystem not found
     at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2195)
     at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2654)
     at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
     at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
    

    你能帮我解决这个问题吗。有可能吗?

  • 共有1个答案

    柯唯
    2023-03-14

    我的目标是:备份任何数据格式的所有内容,从Kafka到Azure BLOB。

    HDFS和云连接器不能备份“任何格式”。Confluent的Avro是文件格式的第一类公民。JSON其次,但根据我的发现,没有“纯文本”格式。我认为HDFS连接器确实支持“字节数组”格式。

    正如我在评论中提到的,在我看来,一个Kafka的备份与无限期地将数据保留到一个文件系统是不同的。备份Kafka到Kafka包括使用MirrorMaker。

     类似资料:
    • 我正在尝试使用Kafka连接接收器将文件从Kafka写入HDFS。 我的属性看起来像: 有什么建议吗?

    • 我有一个kafka主题,有200万条消息,我的刷新大小是100000,默认分区为分布式模式,有4个工作者,我可以看到数据在几秒钟内立即写入HDFS(10到15秒)。 我看到创建了一个+tmp目录和文件夹,并且每次触发一个新连接器时都会创建主题。 kafka connect的行为是每次都写得这么快,还是已经将数据存储在HDFS中,并根据连接器属性将其移动到主题目录? 我需要清楚这是怎么发生的。如果我

    • 我有一个生产者,它正在为一个主题生成protobuf消息。我有一个消费者应用程序,它反序列化protobuf消息。但hdfs接收器连接器直接从Kafka主题接收消息。中的键和值转换器将设置为什么?做这件事最好的方法是什么?提前道谢!

    • 我正在尝试将来自主题的数据(json数据)写入MySql数据库。我想我需要一个JDBC接收器连接器。 我如何配置连接器以将主题中的json数据映射到如何将数据插入数据库。 我能找到的文件只有这个。 “接收器连接器需要了解架构,因此您应该使用合适的转换器,例如架构注册表附带的Avro转换器,或启用了架构的JSON转换器。如果存在Kafka记录键,则可以是基元类型或连接结构,记录值必须是连接结构。从连

    • 我有一个Kafka连接接收器记录从Kafka主题到S3。它在工作,但太慢了。Kafka主题每秒接收约30000条消息。连接接收器无法跟上。我已经尝试增加Kafka连接器的任务。最大值从1到3,这会创建更多任务,但这似乎无助于提高消息/秒的速度。我试着增加Kafka连接工人的CPU分配,这似乎也没有帮助。 我还能试什么?哪些指标有助于监控以进一步识别瓶颈? 更新:Kafka主题有5个分区。Kafka

    • 在JSON中从Kafka生产/消费。使用以下属性保存到JSON中的HDFS: 制作人: 谢谢