当前位置: 首页 > 知识库问答 >
问题:

Kafka使用JsonConverter连接用于JSON格式的HDFS接收器

公良育
2023-03-14

在JSON中从Kafka生产/消费。使用以下属性保存到JSON中的HDFS:

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

制作人:

curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \
      --data '{"schema": {"type": "boolean", "optional": false, "name": "bool", "version": 2, "doc": "the documentation", "parameters": {"foo": "bar" }}, "payload": true }' "http://localhost:8082/topics/test_hdfs_json"
./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-hdfs/quickstart-hdfs.properties
key.converter.schemas.enable=true

value.converter.schemas.enable=true
org.apache.kafka.connect.errors.DataException: JsonDeserializer with schemas.enable requires "schema" and "payload" fields and may not contain additional fields
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:332)

谢谢

共有1个答案

公良英资
2023-03-14

转换器指的是数据将如何从Kafka主题翻译出来,由连接器解释并写入HDFS。HDFS连接器仅支持在avro或parquet中直接写入HDFS。您可以在这里找到如何将格式扩展到JSON的信息。如果您做了这样的扩展,我鼓励您将其贡献给连接器的开源项目。

 类似资料:
  • 我有一个kafka主题,有200万条消息,我的刷新大小是100000,默认分区为分布式模式,有4个工作者,我可以看到数据在几秒钟内立即写入HDFS(10到15秒)。 我看到创建了一个+tmp目录和文件夹,并且每次触发一个新连接器时都会创建主题。 kafka connect的行为是每次都写得这么快,还是已经将数据存储在HDFS中,并根据连接器属性将其移动到主题目录? 我需要清楚这是怎么发生的。如果我

  • 我正在尝试使用Kafka连接接收器将文件从Kafka写入HDFS。 我的属性看起来像: 有什么建议吗?

  • 我有一个生产者,它正在为一个主题生成protobuf消息。我有一个消费者应用程序,它反序列化protobuf消息。但hdfs接收器连接器直接从Kafka主题接收消息。中的键和值转换器将设置为什么?做这件事最好的方法是什么?提前道谢!

  • 我设置了一个Kafka JDBC接收器以将事件发送到PostgreSQL。我编写了这个简单的生产者,它将带有模式(avro)数据的JSON发送到一个主题,如下所示: producer.py(kafka-python) 价值架构: 连接器配置(无主机、密码等) 但我的连接器出现严重故障,有三个错误,我无法找出其中任何一个错误的原因: TL;博士;日志版本 完整日志 有人能帮我理解这些错误和潜在的原因

  • 我对 Confluent 社区许可证和 Confluent 连接器有点困惑。 根据https://www.confluent.io/confluent-community-license-faq/它只适用于一些Confluent连接器,在下图中列出了Apache 2.0许可证下的社区连接器。 然后,如果您搜索社区连接器,您可以看到支持融合的Kafka Connect HDFS连接器(在撰写此问题时

  • 我需要关于Kafka主题的帮助,我想将其放入拼花格式的HDFS中(与daily partitionner)。 我在Kafka主题中有很多数据,基本上都是json数据,如下所示: 本主题的名称为:测试 我想将这些数据以拼花格式放入我的HDFS集群中。但是我在接收器连接器配置方面遇到了困难。为此,我使用了融合的hdfs-shin-连接器。 以下是我迄今为止所做的工作: 关于为什么我这样配置连接器的一些