当前位置: 首页 > 知识库问答 >
问题:

org.apache.kafka.connect.errors.DataException:记录默认值为null的JSON无效

童琪
2023-03-14

我有一个使用KafKaavroSerializer生成的Kafka Avro主题。
我的独立属性如下所示。
我正在使用Confluent 4.0.0运行Kafka Connect。

key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=<schema_registry_hostname>:8081
value.converter.schema.registry.url=<schema_registry_hostname>:8081
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

当我在独立模式下运行hdfs接收器的Kafka连接器时,收到以下错误消息:

[2018-06-27 17:47:41,746] ERROR WorkerSinkTask{id=camus-email-service-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.DataException: Invalid JSON for record default value: null
    at io.confluent.connect.avro.AvroData.defaultValueFromAvro(AvroData.java:1640)
    at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1527)
    at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1410)
    at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1290)
    at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1014)
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:88)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:454)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
[2018-06-27 17:47:41,748] ERROR WorkerSinkTask{id=camus-email-service-0} Task is being killed and will not recover until manually restarted (    org.apache.kafka.connect.runtime.WorkerTask)
[2018-06-27 17:52:19,554] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect).

当我使用kafka-avro-console-consumer传递模式注册表时,我会对Kafka消息进行反序列化。

即:

/usr/bin/kafka-avro-console-consumer --bootstrap-server <kafka-host>:9092 --topic <KafkaTopicName> --property schema.registry.url=<schema_registry_hostname>:8081

共有1个答案

宁鹏程
2023-03-14

将“subscription”列的数据类型更改为Union数据类型可以解决此问题。Avroconverters能够反序列化消息。

 类似资料:
  • 问题内容: 我有带json列的表。 我想获取所有用户记录,其中details [“ email”]为null或电子邮件密钥不存在。 这不起作用: 问题答案: 使用括号。看起来像编译器试图看到它。因此,您可以像这样修复它: 实际上,要获取其中 details [“ email”]为null或电子邮件密钥不存在的记录 ,可以使用以下查询:

  • 我尝试使用liquibase使用liquibase“addDefaultValue”语法将我的列的默认值设置为null: 但是向myTable插入新行显示默认值仍然设置为“false”,就像以前一样。所以liquibase更改集不起作用。 如何设置列默认值为null与liquibase?

  • 问题内容: 我遇到了一个无法解决的SQL愚蠢问题。 错误: 有人可以帮我吗? 问题答案: 仅在字段上可接受。字段必须保留为空的默认值,或者根本不保留任何默认值- 默认值必须为常量值,而不是表达式的结果。 相关文档:http : //dev.mysql.com/doc/refman/5.0/en/data-type- defaults.html 您可以通过在表上设置插入后触发器以在任何新记录上填充“

  • 问题内容: 我正在运行一个非常简单的查询,但是对于某些结果,一个字段中的值为空。如果该值为null,如何将其设置为“字符串”? 就像是 它将在sql server 2005上运行 谢谢 问题答案: 使用以下内容: 或@Lieven指出: COALESCE的动态之处在于您可以定义更多的参数,因此,如果第一个为null,则获取第二个参数,如果第二个为null,则获取第三个,依此类推…

  • spring-boot-starter-parent 2.4.1 spring-boot-starter-data-jpa postgres 12.5 一个实体的简短摘录: 该实体生成一个表,如下所示: 就是这样: 我们可以看到: null 问题 > 我想防止任何人设置Null,就像在branch1中那样。 如果我这样组织它: @columnDefinition=“varchar(255)defa

  • 我有一个记录,想添加默认构造函数。 但是它创建了带有参数的构造函数。 我们如何将默认构造函数添加到记录中?