我使用https://github . com/confluent Inc/confluent-Kafka-python/blob/master/examples/avro _ producer . py中的示例代码将数据加载到主题中。我只做了一个更改,那就是我添加了“default”:为了模式兼容性,每个字段都为null。它加载得很好,因为我可以在http://localhost:9021/中看到消息和模式。如果我通过cli运行kafka-avro-console-consumer命令,我还可以看到进入主题的数据。但是尝试用https://docs . confluent . io/current/connect/Kafka-connect-AWS-redshift/index . html中提供的配置使用红移sink,得到如下所示的错误。然而,如果我没有在字段中添加“default”:null,那么它会一直运行到最后。任何指导将不胜感激。
org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value
at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1812)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1567)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1687)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1543)
at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1226)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:108)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:491)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:491)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:324)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:200)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "null", schema type: INT32
at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:220)
at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)
添加“default: null”是不够的,您需要将类型修改为如下内容:
type: ["null", "string"], default: null
注意在第一个位置的类型中添加“null”,即。没有
type: ["string", "null"], default: null
见讨论:http://apache-avro.679487.n3.nabble.com/How-to-declare-an-optional-field-tp4025089p4025094.html
我正在尝试使用kafka-avro-convore-生产者发布一条具有键(带有模式)和值(带有模式)的消息。kafka环境(kafka的conFluent 6.2.0版本、连接、zoomaster、模式注册表)都正确启动,我可以确认我的连接器已安装。问题是当我发送消息时,我的Sink连接器失败并出现我无法诊断的错误。 感谢您的帮助: 我生成一条AVRO消息,如下所示: 并在连接日志中接收以下错误:
这是我的密码。
设置: Java 8 创建了一个简单的示例程序,以使用来自一个Kafka主题的1M消息并生成到另一个主题-以本地执行模式运行。这两个主题都有32个分区。 当我让它从头到尾运行时,它会消耗并生成所有消息。如果在完成之前启动然后停止(SIGINT),然后再次重新启动,则生产者仅接收原始1M消息的子集。 我已经确认了我对消费者的补偿,它阅读了所有1M消息。 -- 在本地执行模式下,这是预期的吗?是否需要
一、线程间通信的两种方式 1.wait()/notify() Object类中相关的方法有notify方法和wait方法。因为wait和notify方法定义在Object类中,因此会被所有的类所继承。这些方法都是final的,即它们都是不能被重写的,不能通过子类覆写去改变它们的行为。 ①wait()方法: 让当前线程进入等待,并释放锁。 ②wait(long)方法: 让当前线程进入等待,并释放锁,
我使用schema registry为所有带有Kafka Streams的应用程序创建模式注册表。我们的一个流进行聚合,我想对聚合对象使用schema registry,如下所示: 但在向schema注册表添加schema时,我们需要按主题名定义schema。在流上聚合的情况下,这是不可能的,因为主题名称是由流拓扑生成的。 问题是是否有某种方法可以为任何主题创建模式,以便任何流或任何其他解决方案都
我有一个生产者/消费者模式,如下所示 固定数量的生成器线程,每个线程写入它们自己的BlockingQueue,通过执行器调用 单个使用者线程,读取生产者线程 每个生产者都在运行一个数据库查询,并将结果写入其队列。消费者轮询所有生产者队列。目前,如果出现数据库错误,生产者线程就会死掉,然后消费者就会永远停留在产品队列中等待更多的结果。 我应该如何构造它来正确处理catch错误?