我试图使用来自kafka的消息,源消息以Avro格式序列化(我使用了AWS模式注册表)。
连接器配置:
{
"name": "s3-sink-db01",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.bucket.name": "de-team",
"name": "s3-sink-db01",
"tasks.max": "3",
"s3.region": "ap-south-1",
"s3.part.size": "5242880",
"s3.compression.type": "gzip",
"timezone": "UTC",
"locale": "en",
"flush.size": "10",
"rotate.interval.ms": "10",
"topics.regex": "mysql-db01.(.*)",
"internal.key.converter.schemas.enable": "false",
"key.converter.schemas.enable": "false",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"internal.value.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner",
"path.format": "YYYY/MM/dd/HH",
"partition.duration.ms": "3600000",
"key.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter",
"value.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter",
"key.converter.region": "ap-south-1",
"value.converter.region": "ap-south-1",
"key.converter.schemaAutoRegistrationEnabled": "true",
"value.converter.schemaAutoRegistrationEnabled": "true",
"key.converter.avroRecordType": "GENERIC_RECORD",
"value.converter.avroRecordType": "GENERIC_RECORD",
"internal.key.converter.schemas.enable": "false",
"key.converter.schemas.enable": "false",
"internal.value.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"rotate.schedule.interval.ms": "3600000"
}
}
但是当我尝试配置接收器连接器时,它会出现以下错误。
ERROR WorkerSinkTask{id=s3-sink-db01-2} Error converting message key in topic 'mysql-db01.devdb.table1' partition 0 at offset 0 and timestamp 1627302045505: Converting byte[] to Kafka Connect data failed due to serialization error: (org.apache.kafka.connect.runtime.WorkerSinkTask:532)
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
...
...
...
ERROR WorkerSinkTask{id=s3-sink-db01-2} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:184)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Didn't find secondary deserializer.
这个问题是由于键/值转换器造成的。我已经通过纠正它解决了这个问题,一个依赖的问题在这里,解决方案在这里。
我们使用S3接收器连接器从MSK自动气象站的S3桶中接收数据。 我们已经在AWS EKS(Kubernetes)上部署了KafkaS3水槽连接器 当我们启动连接器时,当 S3 存储桶上发生分段上传时出现以下错误。 我们对S3存储桶有策略限制,因为启用了服务器端加密(AWS-KMS),即如果没有KMS密钥,我们无法上传。 下面是我们用于连接器的配置,下面是错误详细信息,供您参考。 好心帮忙 {"na
有没有办法通过Kafka Connect S3接收器连接器标记写入S3存储桶的对象。我正在读取来自Kafka的消息,并使用S3接收器连接器将avro文件写入S3存储桶。当文件写入S3存储桶时,我需要标记文件。
我尝试使用最新的kafka (confluent-platform-2.11)连接将Json放到s3上。我在quickstart-s3.properties文件中设置format . class = io . confluent . connect . S3 . format . JSON . JSON format 和负载连接器: 然后我给Kafka发了一行: ~$ Kafka-控制台-生产者
我有一个需求,即我们应用程序之外的源将在S3存储桶中放置一个文件,我们必须在kafka主题中加载该文件。我正在查看ConFluent的S3 Source连接器,目前正在努力定义在我们的环境中设置连接器的配置。但是有几篇文章指出,只有在您使用S3 Sink连接器将文件放在S3中时,才能使用S3 Source连接器。 以上是真的吗?在配置中,我在哪里/使用什么属性来定义输出主题?当阅读S3的文章并把它
使用此Kafka Connect连接器: https://www.confluent.io/hub/confluentinc/kafka-connect-s3 我手动将其安装到我的kafka Connect Docker映像的插件中。我的目的是使用Kafka Connect将来自Kafka主题的Avro记录写入S3。 在运行时,使用Kafka Connect,我会得到以下错误: 在ConFluen
我正在尝试将来自主题的数据(json数据)写入MySql数据库。我想我需要一个JDBC接收器连接器。 我如何配置连接器以将主题中的json数据映射到如何将数据插入数据库。 我能找到的文件只有这个。 “接收器连接器需要了解架构,因此您应该使用合适的转换器,例如架构注册表附带的Avro转换器,或启用了架构的JSON转换器。如果存在Kafka记录键,则可以是基元类型或连接结构,记录值必须是连接结构。从连