当前位置: 首页 > 知识库问答 >
问题:

Kafka- S3接收器连接器-没有找到辅助反序列化器

阳宾实
2023-03-14

我试图使用来自kafka的消息,源消息以Avro格式序列化(我使用了AWS模式注册表)。

连接器配置:

{
        "name": "s3-sink-db01",
        "config": {
                "connector.class": "io.confluent.connect.s3.S3SinkConnector",
                "storage.class": "io.confluent.connect.s3.storage.S3Storage",
                "s3.bucket.name": "de-team",
                "name": "s3-sink-db01",
                "tasks.max": "3",
                "s3.region": "ap-south-1",
                "s3.part.size": "5242880",
                "s3.compression.type": "gzip",
                "timezone": "UTC",
                "locale": "en",
                "flush.size": "10",
                "rotate.interval.ms": "10",
                "topics.regex": "mysql-db01.(.*)",
                "internal.key.converter.schemas.enable": "false",
                "key.converter.schemas.enable": "false",
                "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
                "internal.value.converter.schemas.enable": "false",
                "value.converter.schemas.enable": "false",
                "partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner",
                "path.format": "YYYY/MM/dd/HH",
                "partition.duration.ms": "3600000",
 "key.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter",
"value.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter",
"key.converter.region": "ap-south-1",
"value.converter.region": "ap-south-1",
"key.converter.schemaAutoRegistrationEnabled": "true",
"value.converter.schemaAutoRegistrationEnabled": "true",
"key.converter.avroRecordType": "GENERIC_RECORD",
"value.converter.avroRecordType": "GENERIC_RECORD",
"internal.key.converter.schemas.enable": "false",
"key.converter.schemas.enable": "false",
"internal.value.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
                "rotate.schedule.interval.ms": "3600000"
        }
}

但是当我尝试配置接收器连接器时,它会出现以下错误。

ERROR WorkerSinkTask{id=s3-sink-db01-2} Error converting message key in topic 'mysql-db01.devdb.table1' partition 0 at offset 0 and timestamp 1627302045505: Converting byte[] to Kafka Connect data failed due to serialization error:  (org.apache.kafka.connect.runtime.WorkerSinkTask:532)

 org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
...
...
...
 ERROR WorkerSinkTask{id=s3-sink-db01-2} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:184)

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler

at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)

Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:

Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Didn't find secondary deserializer.

共有1个答案

曹焱
2023-03-14

这个问题是由于键/值转换器造成的。我已经通过纠正它解决了这个问题,一个依赖的问题在这里,解决方案在这里。

 类似资料:
  • 我们使用S3接收器连接器从MSK自动气象站的S3桶中接收数据。 我们已经在AWS EKS(Kubernetes)上部署了KafkaS3水槽连接器 当我们启动连接器时,当 S3 存储桶上发生分段上传时出现以下错误。 我们对S3存储桶有策略限制,因为启用了服务器端加密(AWS-KMS),即如果没有KMS密钥,我们无法上传。 下面是我们用于连接器的配置,下面是错误详细信息,供您参考。 好心帮忙 {"na

  • 有没有办法通过Kafka Connect S3接收器连接器标记写入S3存储桶的对象。我正在读取来自Kafka的消息,并使用S3接收器连接器将avro文件写入S3存储桶。当文件写入S3存储桶时,我需要标记文件。

  • 我尝试使用最新的kafka (confluent-platform-2.11)连接将Json放到s3上。我在quickstart-s3.properties文件中设置format . class = io . confluent . connect . S3 . format . JSON . JSON format 和负载连接器: 然后我给Kafka发了一行: ~$ Kafka-控制台-生产者

  • 我有一个需求,即我们应用程序之外的源将在S3存储桶中放置一个文件,我们必须在kafka主题中加载该文件。我正在查看ConFluent的S3 Source连接器,目前正在努力定义在我们的环境中设置连接器的配置。但是有几篇文章指出,只有在您使用S3 Sink连接器将文件放在S3中时,才能使用S3 Source连接器。 以上是真的吗?在配置中,我在哪里/使用什么属性来定义输出主题?当阅读S3的文章并把它

  • 使用此Kafka Connect连接器: https://www.confluent.io/hub/confluentinc/kafka-connect-s3 我手动将其安装到我的kafka Connect Docker映像的插件中。我的目的是使用Kafka Connect将来自Kafka主题的Avro记录写入S3。 在运行时,使用Kafka Connect,我会得到以下错误: 在ConFluen

  • 我正在尝试将来自主题的数据(json数据)写入MySql数据库。我想我需要一个JDBC接收器连接器。 我如何配置连接器以将主题中的json数据映射到如何将数据插入数据库。 我能找到的文件只有这个。 “接收器连接器需要了解架构,因此您应该使用合适的转换器,例如架构注册表附带的Avro转换器,或启用了架构的JSON转换器。如果存在Kafka记录键,则可以是基元类型或连接结构,记录值必须是连接结构。从连