当前位置: 首页 > 知识库问答 >
问题:

汇流HDFS接收器连接器:Kafka主题与普通字符串格式的HDFS在拼花格式失败与Avro模式必须是一个记录错误

闻人昕
2023-03-14

我已经在我的虚拟机中安装了docker confluentinc/cp-kafka-connect:4.0.0映像。我有兴趣获取 kafka 主题,它是镶木地板格式的 hdfs 纯文本数据(字符串格式)。

sudo docker run -d \
    --name=kafka-connect \
    --net=host \
    -e CONNECT_BOOTSTRAP_SERVERS=<kafka-server-host>:9092 \
    -e CONNECT_REST_PORT=8082 \
    -e CONNECT_GROUP_ID="connect-kafkac1" \
    -e CONNECT_CONFIG_STORAGE_TOPIC="connect-kafkac1-config" \
    -e CONNECT_OFFSET_STORAGE_TOPIC="connect-kafkac1-offsets" \
    -e CONNECT_STATUS_STORAGE_TOPIC="connect-kafkac1-status" \
    -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e CONNECT_REST_ADVERTISED_HOST_NAME="localhost" \
    -e CONNECT_LOG4J_LOGGERS=org.reflections=ERROR \
    -e CONNECT_PLUGIN_PATH=/usr/share/java \
    confluentinc/cp-kafka-connect:4.0.0

我完成了以下配置。

/etc/kafka/connect-standard.properties

bootstrap.servers=kafkaclusteraddress:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

/etc/Kafka-connect-HDFS/快速启动-hdfs.properties

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=<topicname>
topics.dir=<hdfs-topic-dir>
logs.dir=<hdfs-logs-dir>
hdfs.url=<hdfs-url>:8020
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
flush.size=3
hadoop.conf.dir=/etc/hadoop/conf
hadoop.home=/usr/bin/hadoop

在独立模式下运行配置后收到的错误

# connect-standalone /etc/kafka/connect-standalone.properties /etc/kafka-connect-hdfs/quickstart-hdfs.properties

----------------------------------------

[2018-02-13 19:11:09,542] ERROR WorkerSinkTask{id=hdfs-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask)
java.lang.IllegalArgumentException: Avro schema must be a record.
        at org.apache.parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:113)
        at org.apache.parquet.avro.AvroParquetWriter.writeSupport(AvroParquetWriter.java:144)
        at org.apache.parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:106)
        at io.confluent.connect.hdfs.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:68)
        at io.confluent.connect.hdfs.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:635)
        at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:379)
        at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:374)
        at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:101)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:495)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
[2018-02-13 19:11:09,545] ERROR WorkerSinkTask{id=hdfs-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:517)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: Avro schema must be a record.
        at org.apache.parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:113)
        at org.apache.parquet.avro.AvroParquetWriter.writeSupport(AvroParquetWriter.java:144)
        at org.apache.parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:106)
        at io.confluent.connect.hdfs.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:68)
        at io.confluent.connect.hdfs.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:635)
        at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:379)
        at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:374)
        at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:101)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:495)
        ... 10 more
[2018-02-13 19:11:09,545] ERROR WorkerSinkTask{id=hdfs-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)

共有1个答案

尉迟安民
2023-03-14

如果您查看堆栈跟踪,它期望Avro数据,而不是字符串。

IllegalArgumentException: Avro schema must be a record.
        at org.apache.parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:113)
        at org.apache.parquet.avro.AvroParquetWriter.writeSupport(AvroParquetWriter.java:144)
        at org.apache.parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:106)

我还没有亲自测试过JSON到Parket,但是作为柱状数据类型,ParquetFormat要求您有一个架构。由于您有schemas.enable=false,这不会发生,因此它不能使用纯字符串、整数、布尔值等和其他“非结构化”连接API类型。

这些是需要改变的设置。

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

为了将JSONConverter设置为< code>schemas.enable=true,您必须生成类似于< code>{"schema": {...},“有效载荷”:{...}},其中< code>schema字段包含< code>payload字段中对象的类型定义

否则,如果您可以控制生产者代码,则应改为发送Avro数据,这将需要使用Schema Registry。

我想提一下,由于您只有一个字符串记录,那么 Parquet 不会比 HDFS 上的原始文本文件有太多好处,因为无论如何都只有一列。

 类似资料:
  • 使用Kafka Connect HDFS Sink,我能够将avro数据写入Kafka主题并将数据保存在hive/hdfs中。 我正在尝试使用格式类以拼花文件格式保存数据 快速启动hdfs。属性如下 当我将数据发布到Kafka时,表在hive中创建,test\u hdfs\u parquet目录在hdfs中创建,但由于以下异常,Sink无法以parquet格式保存数据

  • 我正在尝试使用Kafka连接接收器将文件从Kafka写入HDFS。 我的属性看起来像: 有什么建议吗?

  • 我有一个kafka主题,有200万条消息,我的刷新大小是100000,默认分区为分布式模式,有4个工作者,我可以看到数据在几秒钟内立即写入HDFS(10到15秒)。 我看到创建了一个+tmp目录和文件夹,并且每次触发一个新连接器时都会创建主题。 kafka connect的行为是每次都写得这么快,还是已经将数据存储在HDFS中,并根据连接器属性将其移动到主题目录? 我需要清楚这是怎么发生的。如果我

  • 在JSON中从Kafka生产/消费。使用以下属性保存到JSON中的HDFS: 制作人: 谢谢

  • 我需要关于Kafka主题的帮助,我想将其放入拼花格式的HDFS中(与daily partitionner)。 我在Kafka主题中有很多数据,基本上都是json数据,如下所示: 本主题的名称为:测试 我想将这些数据以拼花格式放入我的HDFS集群中。但是我在接收器连接器配置方面遇到了困难。为此,我使用了融合的hdfs-shin-连接器。 以下是我迄今为止所做的工作: 关于为什么我这样配置连接器的一些

  • 我正在寻找Kafka连接连接器,将写从Kafka到本地文件系统的拼花文件格式。我不想使用hdfs或s3接收器连接器进行同样的操作。