我正在尝试kafka connect hdfs接收器连接器,以便将json数据从kafka移动到hdfs。
即使在kafka中的json数据具有模式和有效负载时,kafka connect任务也会因错误而失败
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields.
Kafka的数据:
{"schema": {"type": "struct","fields": [{"type": "string","optional": false,"field": "Name"}, {"type": "string","optional": false,"field": "company"}],"optional": false,"name": "Person"},"payload": {"Name": "deepak","company": "BT"}}
{"schema": {"type": "struct","fields": [{"type": "string","optional": false,"field": "Name"}, {"type": "string","optional": false,"field": "company"}],"optional": false,"name": "Person"},"payload": {"Name": "sufi","company": "BT"}}
{"schema": {"type": "struct","fields": [{"type": "string","optional": false,"field": "Name"}, {"type": "string","optional": false,"field": "company"}],"optional": false,"name": "Person"},"payload": {"Name": "vikas","company": "BT"}}
curl -X POST -H "Content-Type: application/json" --data '{"name": "connect-cluster-15may-308pm", "config": {"connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max":"2", "hdfs.url": "hdfs://localhost:9000","flush.size": "1","topics":"test_hdfs_json_schema_payload_1","topics.dir":"/deepak/kafka-connect/trial1"}}' http://localhost:8083/connectors
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
错误消息:
http://localhost:8083/connectors/connect-cluster-15may-308pm/tasks/0/status
{
"state": "FAILED",
"trace": "org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.\n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:309)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:400)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n",
"id": 0,
"worker_id": "127.0.0.1:8083"
}
你用的是什么版本的Kafka连接?当从stacktrace中确定错误的来源时,了解这一点很有帮助。
我认为发生的情况是,你在值中有数据,但在键中没有。由于您将key.converter
和value.converter
都设置为JSONConverter
并且使用schemas.enable=true
,因此希望看到包含schema
和payload
的信封格式。但是,我猜您的密钥都是null
。
这与https://issues.apache.org/jira/browse/kafka-3832类似,其中jsonConverter
从不生成真null
值。相反,它总是生成包含预期的可选模式+null
负载的信封。在这种情况下,从Kafka转换到Connect的数据API是不起作用的,因为它在密钥中期望相同的信封格式。
您可以通过向控制台使用者命令添加--property print.key=true
来验证是否存在问题。如果它打印出null
键,问题是JsonConverter无法解码它们。
一个简单的解决方法是,只对不关心空
值的键使用其他转换器
--键中没有任何数据。Kafka Connect附带的一个工具是org.apache.Kafka.Connect.storage.StringConverter
。
我设置了一个Kafka JDBC接收器以将事件发送到PostgreSQL。我编写了这个简单的生产者,它将带有模式(avro)数据的JSON发送到一个主题,如下所示: producer.py(kafka-python) 价值架构: 连接器配置(无主机、密码等) 但我的连接器出现严重故障,有三个错误,我无法找出其中任何一个错误的原因: TL;博士;日志版本 完整日志 有人能帮我理解这些错误和潜在的原因
水槽新手。 假设我有一个代理,它有一个 avero 源、一个 hdfs 接收器和一个文件通道。 假设在某个时候接收器无法写入hdfs。源是否会继续接受事件,直到通道填满? 或者即使文件通道未满,源也会停止接受事件吗?
我正在尝试使用Kafka连接接收器将文件从Kafka写入HDFS。 我的属性看起来像: 有什么建议吗?
我正在使用Kafka源和接收器连接器创建一个数据管道。源连接器从SQL数据库消费并发布到主题,而接收器连接器订阅主题并放入其他SQL数据库。表有16 GB的数据。现在的问题是,数据不能从一个数据库传输到另一个数据库。但是,如果表的大小很小,比如1000行,那么数据可以成功传输。 源连接器配置: 源连接器日志: 有人能指导我如何调整Kafka源连接器以传输大数据吗?
当我运行kafka JDBC连接器到PSQL时,我收到以下错误: JSON converter with schemas . enable需要“schema”和“payload”字段,不能包含其他字段。如果试图反序列化普通JSON数据,请在转换器配置中设置schemas.enable=false。 但是,我的主题包含以下消息结构,并添加了一个架构,就像在线显示的那样: Rowtime: 2022/
我正在收听一个Activemq队列。 我的配置如下: 从控制台日志中,我可以看到尝试重新连接到该服务器。 [2014-08-20 08:57:43,303]INFO 236[ActiveMQ Task-1]-org.apache.activemq.transport.failover.FailoverTransport.doReconnect(FailoverTransport.java:1030