我需要关于Kafka主题的帮助,我想将其放入拼花格式的HDFS中(与daily partitionner)。
我在Kafka主题中有很多数据,基本上都是json数据,如下所示:
{"title":"Die Hard","year":1988,"cast":["Bruce Willis","Alan Rickman","Bonnie Bedelia","William Atherton","Paul Gleason","Reginald VelJohnson","Alexander Godunov"],"genres":["Action"]}
{"title":"Toy Story","year":1995,"cast":["Tim Allen","Tom Hanks","(voices)"],"genres":["Animated"]}
{"title":"Jurassic Park","year":1993,"cast":["Sam Neill","Laura Dern","Jeff Goldblum","Richard Attenborough"],"genres":["Adventure"]}
{"title":"The Lord of the Rings: The Fellowship of the Ring","year":2001,"cast":["Elijah Wood","Ian McKellen","Liv Tyler","Sean Astin","Viggo Mortensen","Orlando Bloom","Sean Bean","Hugo Weaving","Ian Holm"],"genres":["Fantasy »]}
{"title":"The Matrix","year":1999,"cast":["Keanu Reeves","Laurence Fishburne","Carrie-Anne Moss","Hugo Weaving","Joe Pantoliano"],"genres":["Science Fiction"]}
本主题的名称为:测试
我想将这些数据以拼花格式放入我的HDFS集群中。但是我在接收器连接器配置方面遇到了困难。为此,我使用了融合的hdfs-shin-连接器。
以下是我迄今为止所做的工作:
{
"name": "hdfs-sink",
"config": {
"name": "hdfs-sink",
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max": "1",
"topics": "test",
"hdfs.url": "hdfs://hdfs-IP:8020",
"hadoop.home": "/user/test-user/TEST",
"flush.size": "3",
"locale": "fr-fr",
"timezone": "UTC",
"format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
"partitioner.class": "io.confluent.connect.hdfs.partitioner.DailyPartitioner",
"consumer.auto.offset.reset": "earliest",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "true"
}
}
关于为什么我这样配置连接器的一些解释:
我知道可能我必须使用模式注册表将数据格式化为拼花,但我不知道如何做到这一点。有必要吗?
你能帮我一下吗?
非常感谢。
我个人没有使用ParquetFormat,但您的数据必须有一个模式,这意味着以下内容之一
基本上,它不能是“纯JSON”。也就是说,您目前有"value.converter.schemas.enable":"true"
,我猜您的连接器不工作,因为您的记录不是上述格式。
基本上,如果没有模式,JSON解析器不可能知道Parquet需要写什么“列”。
每日分区每天不创建一个文件,只创建一个目录。每个flush.size
您将获得一个文件,并且还有一个刷新文件的预定旋转间隔配置。此外,每个Kafka分区将有一个文件。
此外,“consumer.auto.offset.reset”:“earliest”,
仅在连接分发中有效。属性文件,不基于每个连接器,AFAIK。
由于我个人没有使用过ParquetFormat,这就是我能给出的所有建议,但我使用了其他工具,如NiFi,来实现类似的目标,这将允许您不更改现有的Kafka生产者代码。
或者,使用JSONFormat,然而,配置单元集成将无法自动工作,并且必须预定义表(这将要求您为主题提供架构)。
另一个选项是将Hive配置为直接从Kafka读取
我有一个生产者,它正在为一个主题生成protobuf消息。我有一个消费者应用程序,它反序列化protobuf消息。但hdfs接收器连接器直接从Kafka主题接收消息。中的键和值转换器将设置为什么?做这件事最好的方法是什么?提前道谢!
使用Kafka Connect HDFS Sink,我能够将avro数据写入Kafka主题并将数据保存在hive/hdfs中。 我正在尝试使用格式类以拼花文件格式保存数据 快速启动hdfs。属性如下 当我将数据发布到Kafka时,表在hive中创建,test\u hdfs\u parquet目录在hdfs中创建,但由于以下异常,Sink无法以parquet格式保存数据
我正在尝试使用Kafka连接接收器将文件从Kafka写入HDFS。 我的属性看起来像: 有什么建议吗?
我有一个kafka主题,有200万条消息,我的刷新大小是100000,默认分区为分布式模式,有4个工作者,我可以看到数据在几秒钟内立即写入HDFS(10到15秒)。 我看到创建了一个+tmp目录和文件夹,并且每次触发一个新连接器时都会创建主题。 kafka connect的行为是每次都写得这么快,还是已经将数据存储在HDFS中,并根据连接器属性将其移动到主题目录? 我需要清楚这是怎么发生的。如果我
在JSON中从Kafka生产/消费。使用以下属性保存到JSON中的HDFS: 制作人: 谢谢
我正在寻找Kafka连接连接器,将写从Kafka到本地文件系统的拼花文件格式。我不想使用hdfs或s3接收器连接器进行同样的操作。