当前位置: 首页 > 知识库问答 >
问题:

Kafka Connect:读取JSON序列化的Kafka消息,转换为Parket格式并保留在S3中

狄新立
2023-03-14

我需要从Kafka主题中读取JSON序列化消息,将其转换为拼花,并在S3中持久化。

出身背景

官方S3接收器连接器支持拼花输出格式,但:

您必须为此连接器使用AvroConverter、Pro bufConverter或带有ParquetFormat的JsonSchemaConverter。尝试使用JsonConverter(带或不带模式)会导致NullPointerException和StackOverflow Exception。

如果消息不是使用JSON模式序列化编写的,则JsonSchemaConverter会抛出一个错误。

问题陈述

因此,我正在寻找一种方法来读取Kafka主题中最初以JSON格式编写的消息,以某种方式将其转换为JSON模式格式,然后将其插入S3连接器,该连接器将以拼花格式写入S3。

或者,考虑到主要需求,我也愿意使用其他解决方案(不涉及编写JAVA代码)(以Kafka消息为例,将其作为拼花文件放在S3中)。谢谢

PS:不幸的是,现在我无法选择更改这些Kafka消息最初的编写方式(例如使用JSON模式序列化和模式发现)。

共有1个答案

杜苏燕
2023-03-14

一般来说,您的数据需要有一个模式,因为Parket需要它(S3 parket编写器翻译为Avro作为中间步骤)

您可以研究使用这种连接转换,该转换接受模式,并尝试应用JSON模式-请参阅测试。由于这将返回一个Struct对象,因此您可以尝试使用JsonSchemaConverter作为接收器的一部分。

但是,如果您只是将随机JSON数据抛入没有任何一致字段或值的单个主题,那么您将很难应用任何模式

 类似资料:
  • 当使用Kafka Connect IBM MQ Source Connector使用5个任务的并行级别从IMB MQ读取时,是否可以保留消息顺序(将具有相同键的消息分配给相同的分区)?

  • 我有一个对象,它有2个XMLGregorianCalendar对象--一个用于日期,另一个用于时间。我使用Jackson对象映射器将日期转换为JSON格式。转换前日期为2014-02-10&时间为11:15:00。转换为JSON后,它变为{“Date”:1392008400000,“Time”:58500000}。在用JSON打印后,如何保留相同的日期和时间格式({“日期”:2014-02-10,

  • 我目前正在尝试将一些包含时间戳的数据库对象存储到图形数据库(dgraph)中。我想使用JSON轻松获取日期和时间信息,并将其存储在图表中的datetime节点中,但图表将只接受格式为RFC 3339格式且带有可选时区的数据(例如)。 我一直在使用Gson序列化和反序列化以及其他数据类型工作正常,但是尝试查询日期返回。我尝试使用,但这似乎没有改变任何事情。我目前将时间戳存储在类中,但如果需要,可以更

  • 我将键值对的哈希映射声明为, 我正在使用一种方法序列化此hashmap,并使用以下代码将其转换为字符串: 当我调用序列化元数据方法并将我的hashmap作为输入传递时,我希望映射状态由ObjectOutputStream流式传输,并在ByteArrayOutputStream(baos)中存储字节数组 {key1=value1,key2=value2,key3=value3} 当我们将hashma

  • 我正在尝试使用阿帕奇火花读取表。 以下是我的实现: 所以在火花壳里 我们在“订单”变量中得到结果。 如何将此结果转换为拼花文件或格式? 更新:我找到这段访问和转换dynamodb数据的代码https://github . com/onzocom/spark-dynamo db/blob/master/src/main/Scala/com/onzo/spark/dynamo db/dynamo db

  • 我从Spring Boot应用程序向Kafka发送消息 application.properties 配置 在日志中,我可以看到如下消息: SUCCESS: SendResult[producerRecords=产品记录(主题=uniqTopic123,分区=null,标头=RecordHeaders(标头 = [], isReadOnly=true),key=testKey,value=Test