当前位置: 首页 > 知识库问答 >
问题:

Kafka Avro Producer(kafka-avro-console-生产者)发送到kafka连接时的架构错误

康恩
2023-03-14

我正在尝试使用kafka-avro-convore-生产者发布一条具有键(带有模式)和值(带有模式)的消息。kafka环境(kafka的conFluent 6.2.0版本、连接、zoomaster、模式注册表)都正确启动,我可以确认我的连接器已安装。问题是当我发送消息时,我的Sink连接器失败并出现我无法诊断的错误。

感谢您的帮助:

我生成一条AVRO消息,如下所示:

docker exec -it schema-registry \
  /usr/bin/kafka-avro-console-producer \
  --broker-list http://kafka:9092 \
  --topic source-1 \
  --property value.schema='{"type":"record","name":"somerecord","fields":[{"name":"timestamp","type":"string"}, {"name":"data","type":"string"}]}' \
  --property parse.key=true \
  --property key.schema='{"type":"int"}' \
  --property key.separator=" "

1 {"timestamp":"some-timestamp", "data":"somedata"}

并在连接日志中接收以下错误:

connect            | [2021-10-04 18:22:51,792] ERROR WorkerSinkTask{id=brodagroupsoftware-http-sink-connector-0} Error converting message key in topic 'source-1' partition 0 at offset 0 and timestamp 1633371770674: Converting byte[] to Kafka Connect data failed due to serialization error of topic source-1:  (org.apache.kafka.connect.runtime.WorkerSinkTask)
connect            | org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error of topic source-1:
connect            |    at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:118)
connect            |    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:530)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:493)
connect            |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
connect            |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
connect            |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:493)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:473)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
connect            |    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect            |    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect            |    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
connect            |    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect            |    at java.base/java.lang.Thread.run(Thread.java:829)
connect            | Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id 1
connect            | Caused by: java.io.IOException: Invalid schema "long" with refs [] of type AVRO
connect            |    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.lambda$getSchemaByIdFromRegistry$6(CachedSchemaRegistryClient.java:229)
connect            |    at java.base/java.util.Optional.orElseThrow(Optional.java:408)
connect            |    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:227)
connect            |    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:298)
connect            |    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaById(CachedSchemaRegistryClient.java:283)
connect            |    at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:107)
connect            |    at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaJsonSchemaDeserializer.java:208)
connect            |    at io.confluent.connect.json.JsonSchemaConverter$Deserializer.deserialize(JsonSchemaConverter.java:163)
connect            |    at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:107)
connect            |    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:530)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:493)
connect            |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
connect            |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
connect            |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:493)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:473)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
connect            |    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect            |    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect            |    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
connect            |    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect            |    at java.base/java.lang.Thread.run(Thread.java:829)
connect            | [2021-10-04 18:22:51,796] INFO HttpSinkTask:flush (com.brodagroup.datamesh.connect.httpsinkconnector.HttpSinkTask)

我的docker compose启动“connect”如下所示:

     connect:
        image: confluentinc/cp-kafka-connect:6.2.0
        hostname: connect
        container_name: connect
        depends_on:
          - zookeeper
          - kafka
        ports:
          - 8083:8083
        environment:
          CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
          CONNECT_REST_PORT: 8083
          CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
          CONNECT_GROUP_ID: compose-connect-group
          CONNECT_CONFIG_STORAGE_TOPIC: connect-configs
          CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets
          CONNECT_STATUS_STORAGE_TOPIC: connect-status

          CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "true"
          CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "true"
          CONNECT_KEY_CONVERTER: "io.confluent.connect.json.JsonSchemaConverter"
          CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
          CONNECT_VALUE_CONVERTER: "io.confluent.connect.json.JsonSchemaConverter"
          CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"

          CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
          CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
          CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
          CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
          CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
          CONNECT_PLUGIN_PATH: '/usr/share/java'

共有1个答案

秦博延
2023-03-14

根据stacktrace,您正在使用JsonSchemaConverter,但是,您已经生成了Avro数据。

因此,您应该在连接器配置中使用AvroConverter

 类似资料:
  • 我用本地安装的Confluent4.0.0尝试了官方模式-注册表-汇合示例(Consumer/Producer),它可以在发送post请求和在listener接收时发送“Sensor”avro消息,但当我使用Confluent4.0.0附带的kafka-avro-console-consumer工具查看发送的avro消息时,该工具引发了以下错误(a)。我还尝试使用kafka-avro-consol

  • 我使用https://github . com/confluent Inc/confluent-Kafka-python/blob/master/examples/avro _ producer . py中的示例代码将数据加载到主题中。我只做了一个更改,那就是我添加了“default”:为了模式兼容性,每个字段都为null。它加载得很好,因为我可以在http://localhost:9021/中看

  • 我在Kafka的制作人那个里工作,推动主题中的信息。我用的是融合的Kafka。 github上的喜欢问题 下面是我的模式。avsc文件。 Keys.avsc 测试.avsc 生产者.py 当我尝试注册时,它工作正常,没有错误。但是当我尝试注册之后注册。我得到以下错误。 confluent_kafka.avro.error。ClientError:架构分析失败:未知的命名架构“io.codebrew

  • 我们目前在HDF(Hortonworks Dataflow)3.3.1上,它捆绑了Kafka 2.0.0,并且正在尝试使用分布式模式下的Kafka Connect,以推出一个Google Cloud PubSub接收器连接器。我们正在计划将一些元数据发回到Kafka主题中,并且需要将一个Kafka生产者集成到Sink任务Java代码的flush()函数中。 这是否会对Kafka Connect向K

  • 这是我的密码。

  • 我有一组Kafka代理实例作为集群运行。我有一个客户正在生产数据给Kafka: 当我们使用tcpdump进行监控时,我可以看到只有到broker1和broker2的连接被建立,而对于broker3,没有来自我的生产者的连接。我有一个只有一个分区的单一主题。 我的问题是: > 为什么在我的情况下,我无法连接到broker3?或者至少我的网络监控没有显示我的制作人与broker3建立了连接? 如果我能