当前位置: 首页 > 知识库问答 >
问题:

合流kafka-avro生产者模式错误

终逸仙
2023-03-14

我使用https://github . com/confluent Inc/confluent-Kafka-python/blob/master/examples/avro _ producer . py中的示例代码将数据加载到主题中。我只做了一个更改,那就是我添加了“default”:为了模式兼容性,每个字段都为null。它加载得很好,因为我可以在http://localhost:9021/中看到消息和模式。如果我通过cli运行kafka-avro-console-consumer命令,我还可以看到进入主题的数据。但是尝试用https://docs . confluent . io/current/connect/Kafka-connect-AWS-redshift/index . html中提供的配置使用红移sink,得到如下所示的错误。然而,如果我没有在字段中添加“default”:null,那么它会一直运行到最后。任何指导将不胜感激。

org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value
    at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)
    at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1812)
    at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1567)
    at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1687)
    at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1543)
    at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1226)
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:108)
    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:491)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:491)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:324)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:200)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "null", schema type: INT32
    at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:220)
    at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
    at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)

共有1个答案

督飞羽
2023-03-14

添加“default: null”是不够的,您需要将类型修改为如下内容:

type: ["null", "string"], default: null

注意在第一个位置的类型中添加“null”,即。没有

type: ["string", "null"], default: null

见讨论:http://apache-avro.679487.n3.nabble.com/How-to-declare-an-optional-field-tp4025089p4025094.html

 类似资料:
  • 我正在尝试使用kafka-avro-convore-生产者发布一条具有键(带有模式)和值(带有模式)的消息。kafka环境(kafka的conFluent 6.2.0版本、连接、zoomaster、模式注册表)都正确启动,我可以确认我的连接器已安装。问题是当我发送消息时,我的Sink连接器失败并出现我无法诊断的错误。 感谢您的帮助: 我生成一条AVRO消息,如下所示: 并在连接日志中接收以下错误:

  • 这是我的密码。

  • 设置: Java 8 创建了一个简单的示例程序,以使用来自一个Kafka主题的1M消息并生成到另一个主题-以本地执行模式运行。这两个主题都有32个分区。 当我让它从头到尾运行时,它会消耗并生成所有消息。如果在完成之前启动然后停止(SIGINT),然后再次重新启动,则生产者仅接收原始1M消息的子集。 我已经确认了我对消费者的补偿,它阅读了所有1M消息。 -- 在本地执行模式下,这是预期的吗?是否需要

  • 一、线程间通信的两种方式 1.wait()/notify() Object类中相关的方法有notify方法和wait方法。因为wait和notify方法定义在Object类中,因此会被所有的类所继承。这些方法都是final的,即它们都是不能被重写的,不能通过子类覆写去改变它们的行为。 ①wait()方法: 让当前线程进入等待,并释放锁。 ②wait(long)方法: 让当前线程进入等待,并释放锁,

  • 我使用schema registry为所有带有Kafka Streams的应用程序创建模式注册表。我们的一个流进行聚合,我想对聚合对象使用schema registry,如下所示: 但在向schema注册表添加schema时,我们需要按主题名定义schema。在流上聚合的情况下,这是不可能的,因为主题名称是由流拓扑生成的。 问题是是否有某种方法可以为任何主题创建模式,以便任何流或任何其他解决方案都

  • 我正试图了解更多关于我们在Kafka主题中使用的Avro模式的信息,我对这一点相对来说比较陌生。 我想知道是否有一种方法可以在特定情况下发展模式。我们用一个不能为null的新字段或任何默认值来更新模式,因为这些新字段是标识符。解决这个问题的方法是创建新主题,但是有没有更好的方法来改进现有模式?