我正在使用ConFluent的Kafka s3连接将数据从apache Kafka复制到AWS S3。
问题是,我有AVRO格式的Kafka数据,它没有使用Confluent Schema Registry的AVRO序列化程序,并且我无法更改Kafka生产者。因此,我需要反序列化来自Kafka的现有Avro数据,然后在AWS S3中以拼花格式保存相同的数据。我尝试使用confluent的AvroConverter作为值转换器,如下所示-
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost/api/v1/avro
我得到了这个错误-
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic dcp-all to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:488)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
据我所知,“io.confluent.connect.avro.AvroConverter”只有在数据是用Kafka使用融合模式注册表的Avro序列化器写入的情况下才有效,因此我收到了这个错误。所以我的问题是在这种情况下我需要实现一个通用的AvroConverter吗?如果是,我如何扩展现有的源代码-https://github.com/confluentinc/kafka-connect-storage-cloud?
任何帮助都将不胜感激。
您不需要扩展该存储库。您只需要实现一个Converter
(Apache Kafka的一部分)将其遮挡到JAR中,然后将其放置在Connect工作人员的CLASSPATH
上,就像BlueApron为原型所做的那样
或者看看这是否有效-https://github.com/farmdawgnation/registryless-avro-converter
不使用融合架构注册表
那么您使用的注册表是什么?我所知道的每一个都有与汇合一个接口的配置
我正在尝试将MySQL与Kafka Connect连接,并且出现了许多错误。我正在共享我的connect-standalone.properties和mysql-jdbc-connector.properties,并显示错误。我的 Kafka 和 MySQL 在不同的集群中,我使用的是融合连接器,但不是在融合接口中。我下载了4.1.0 JDBC MySQL融合连接器。 MySQL-JDBC-con
我使用confluent的kafka connect将数据传输到s3桶中。基于键进行理想的分区。因为现有的FieldPartitioner只适用于Avro模式记录,而不适用于一般的字符串化JSON文本。我想我应该写我自己的连接器。 课堂是这样的: 当我构建它并尝试运行kafka connect时,我得到了一个错误 从查看打包一个自定义Java'partitioner.class'插件为Kafka连
我正在使用Spring Boot 2.3.0和Spring Kafka 2.5.0,在我的KafkaListener中,我试图将MessageHeaders映射到一个自定义类。下面的代码可以工作,但给出了byte[]格式的头,然后我必须将其转换为侦听器内部的类(并对每个侦听器重复此操作),这是我希望避免的。 当我将代码更改为: 我查看了https://docs.spring.io/spring-k
我试图实现一个准备在Kafka 2.0.0中发布的AuthenticateCallbackHandler,但没有成功-这是一个应该工作的设置吗? 在https://cwiki.apache.org/confluence/display/KAFKA/KIP-86:可配置的SASL回调处理器上我读到: 使用外部身份验证服务器进行SASL/PLAIN身份验证,使用Kafka中包含的PLAIN的SaslS
在开发过程中,常常需要为一些repository方法添加自定义的实现。Spring Data repository允许开发者自定义repository方法。
看起来Spring总是使用< code > inmemorrelyingpartyregistrationrepository 来返回一个< code > RelyingPartyRegistrationRepository 类型的bean,请参考https://github . com/Spring-projects/Spring-boot/blob/master/Spring-boot-pro