当前位置: 首页 > 知识库问答 >
问题:

Kafka主题数据到HDFS拼花文件使用HDFS接收器连接器配置问题

陈琪
2023-03-14

我需要关于Kafka主题的帮助,我想将其放入拼花格式的HDFS中(与daily partitionner)。

我在Kafka主题中有很多数据,基本上都是json数据,如下所示:

{"title":"Die Hard","year":1988,"cast":["Bruce Willis","Alan Rickman","Bonnie Bedelia","William Atherton","Paul Gleason","Reginald VelJohnson","Alexander Godunov"],"genres":["Action"]}
{"title":"Toy Story","year":1995,"cast":["Tim Allen","Tom Hanks","(voices)"],"genres":["Animated"]}
{"title":"Jurassic Park","year":1993,"cast":["Sam Neill","Laura Dern","Jeff Goldblum","Richard Attenborough"],"genres":["Adventure"]}
{"title":"The Lord of the Rings: The Fellowship of the Ring","year":2001,"cast":["Elijah Wood","Ian McKellen","Liv Tyler","Sean Astin","Viggo Mortensen","Orlando Bloom","Sean Bean","Hugo Weaving","Ian Holm"],"genres":["Fantasy »]}
{"title":"The Matrix","year":1999,"cast":["Keanu Reeves","Laurence Fishburne","Carrie-Anne Moss","Hugo Weaving","Joe Pantoliano"],"genres":["Science Fiction"]}

本主题的名称为:测试

我想将这些数据以拼花格式放入我的HDFS集群中。但是我在接收器连接器配置方面遇到了困难。为此,我使用了融合的hdfs-shin-连接器。

以下是我迄今为止所做的工作:

{
  "name": "hdfs-sink",
  "config": {
    "name": "hdfs-sink",
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "tasks.max": "1",
    "topics": "test",
    "hdfs.url": "hdfs://hdfs-IP:8020",
    "hadoop.home": "/user/test-user/TEST",
    "flush.size": "3",
    "locale": "fr-fr",
    "timezone": "UTC",
    "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
    "partitioner.class": "io.confluent.connect.hdfs.partitioner.DailyPartitioner",
    "consumer.auto.offset.reset": "earliest",
    "value.converter":  "org.apache.kafka.connect.json.JsonConverter",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "true",
    "value.converter.schemas.enable": "true"

  }
}

关于为什么我这样配置连接器的一些解释:

  • 我每天都有很多这样的数据填充我的话题

我知道可能我必须使用模式注册表将数据格式化为拼花,但我不知道如何做到这一点。有必要吗?

你能帮我一下吗?

非常感谢。

共有1个答案

柯默
2023-03-14

我个人没有使用ParquetFormat,但您的数据必须有一个模式,这意味着以下内容之一

  1. 您的数据是使用ConFluent Avro序列化器生成的
  2. 您的数据将作为PROTOBUF生成,并且您会将PROTOBUF转换器添加到您的Connect工作人员中
  3. 您使用Kafka Connect的特殊JSON格式,该格式在您的记录中包含一个模式。

基本上,它不能是“纯JSON”。也就是说,您目前有"value.converter.schemas.enable":"true",我猜您的连接器不工作,因为您的记录不是上述格式。

基本上,如果没有模式,JSON解析器不可能知道Parquet需要写什么“列”。

每日分区每天不创建一个文件,只创建一个目录。每个flush.size您将获得一个文件,并且还有一个刷新文件的预定旋转间隔配置。此外,每个Kafka分区将有一个文件。

此外,“consumer.auto.offset.reset”:“earliest”,仅在连接分发中有效。属性文件,不基于每个连接器,AFAIK。

由于我个人没有使用过ParquetFormat,这就是我能给出的所有建议,但我使用了其他工具,如NiFi,来实现类似的目标,这将允许您不更改现有的Kafka生产者代码

或者,使用JSONFormat,然而,配置单元集成将无法自动工作,并且必须预定义表(这将要求您为主题提供架构)。

另一个选项是将Hive配置为直接从Kafka读取

 类似资料:
  • 我有一个生产者,它正在为一个主题生成protobuf消息。我有一个消费者应用程序,它反序列化protobuf消息。但hdfs接收器连接器直接从Kafka主题接收消息。中的键和值转换器将设置为什么?做这件事最好的方法是什么?提前道谢!

  • 使用Kafka Connect HDFS Sink,我能够将avro数据写入Kafka主题并将数据保存在hive/hdfs中。 我正在尝试使用格式类以拼花文件格式保存数据 快速启动hdfs。属性如下 当我将数据发布到Kafka时,表在hive中创建,test\u hdfs\u parquet目录在hdfs中创建,但由于以下异常,Sink无法以parquet格式保存数据

  • 我正在尝试使用Kafka连接接收器将文件从Kafka写入HDFS。 我的属性看起来像: 有什么建议吗?

  • 我有一个kafka主题,有200万条消息,我的刷新大小是100000,默认分区为分布式模式,有4个工作者,我可以看到数据在几秒钟内立即写入HDFS(10到15秒)。 我看到创建了一个+tmp目录和文件夹,并且每次触发一个新连接器时都会创建主题。 kafka connect的行为是每次都写得这么快,还是已经将数据存储在HDFS中,并根据连接器属性将其移动到主题目录? 我需要清楚这是怎么发生的。如果我

  • 在JSON中从Kafka生产/消费。使用以下属性保存到JSON中的HDFS: 制作人: 谢谢

  • 我正在寻找Kafka连接连接器,将写从Kafka到本地文件系统的拼花文件格式。我不想使用hdfs或s3接收器连接器进行同样的操作。