当前位置: 首页 > 知识库问答 >
问题:

Kafka Conenct HDFS接收器以拼花格式保存数据

尉迟清野
2023-03-14

使用Kafka Connect HDFS Sink,我能够将avro数据写入Kafka主题并将数据保存在hive/hdfs中。
我正在尝试使用格式类以拼花文件格式保存数据

format.class=io.confluent.connect.hdfs.parquet.ParquetFormat

快速启动hdfs。属性如下

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=test_hdfs_parquet
hdfs.url=hdfs://localhost:9000
flush.size=3
hive.metastore.uris=thrift://10.15.167.109:9083
hive.integration=true
schema.compatibility=BACKWARD
partitioner.class=io.confluent.connect.hdfs.partitioner.HourlyPartitioner
locale=en-us
timezone=UTC
key.converter.schemas.enable=false
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat

当我将数据发布到Kafka时,表在hive中创建,test\u hdfs\u parquet目录在hdfs中创建,但由于以下异常,Sink无法以parquet格式保存数据

java.lang.NoSuchMethodError: org.apache.avro.Schema.getLogicalType()Lorg/apache/avro/LogicalType;
        at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:178)
        at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:130)
        at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:227)
        at org.apache.parquet.avro.AvroSchemaConverter.convertFields(AvroSchemaConverter.java:124)
        at org.apache.parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:115)
        at org.apache.parquet.avro.AvroParquetWriter.writeSupport(AvroParquetWriter.java:144)
        at org.apache.parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:106)
        at io.confluent.connect.hdfs.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:68)
        at io.confluent.connect.hdfs.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:635)
        at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:379)
        at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:374)
        at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:101)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:495)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2018-03-13 11:48:41,148] ERROR WorkerSinkTask{id=hdfs-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:517)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

共有1个答案

戚良弼
2023-03-14

你似乎遇到了这个问题

提到的解决方案是使用Avro 1.7.7库,这意味着不仅要使用Avro,还要使用其他Avro JAR。

或者您可以尝试从源代码编译hdfs connect,并更新所有JAR。

 类似资料:
  • 我正在寻找Kafka连接连接器,将写从Kafka到本地文件系统的拼花文件格式。我不想使用hdfs或s3接收器连接器进行同样的操作。

  • 我有使用Protobuf制作的主题事件。我可以使用Parquet格式的S3 sink连接器将主题事件成功地汇到S3存储桶中。现在我的S3存储桶中有和。使用以下配置,所有这些都按预期工作: 现在,我想使用Protobuf将< code > my-bucket-123 (< code > parquet 格式)的键和值放回到Kafka主题中。为此,我使用以下配置通过汇合设置了一个新的S3源连接器(<

  • 我是Spark的新手。我尝试在本地模式(windows)下使用spark java将csv文件保存为parquet。我得到了这个错误。 原因:org.apache.spark.Spark异常:写入行时任务失败 我引用了其他线程并禁用了spark推测 set("spark.speculation "," false ") 我还是会出错。我在csv中只使用了两个专栏进行测试。 输入: 我的代码: 请帮

  • 目前我们在生产中使用Avro数据格式。从使用Avro的几个优点中,我们知道它在模式演变方面是好的。 现在我们正在评估Parque格式,因为它在读取随机列时的效率。所以在前进之前,我们仍然关注模式演变。 有谁知道模式演变是否可能在镶木地板中实现,如果是,它怎么可能,如果不是,那么为什么不呢。 一些资源声称这是可能的,但它只能在末尾添加列。 这是什么意思?

  • 我在一个Spark项目上工作,这里我有一个文件是在parquet格式,当我试图用java加载这个文件时,它给了我下面的错误。但是,当我用相同的路径在hive中加载相同的文件并编写查询select*from table_name时,它工作得很好,数据也很正常。关于这个问题,请帮助我。 java.io.ioException:无法读取页脚:java.lang.runtimeException:损坏的文

  • 我正试图在模式下将写入文件格式(在最新的pandas版本0.21.0中引入)。但是,文件将被新数据覆盖,而不是附加到现有文件。我错过了什么? 写入语法是 读取语法是