我使用的是一个带有查询模式的jdbc源连接器,似乎没有指定的表名,在schema-registry中注册的记录键和记录值的模式名称为空,并被分配默认名称“ConnectDefault”,如Confluent的AvroData类https://github.com/confluentinc/schema-registry/blob/master/avro-converter/src/main/Java/io/confluent/connect/avro/AvroData.Java中定义的
当使用生成的avro源和SpecificAvroSerde运行Kafka Streams应用程序时,我得到错误:
Exception in thread "streams-app-6e39ebfd-db14-49bc-834f-afaf108a6d25-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to deserialize value for record. topic=topic-name, partition=0, offset=0
at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:46)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:474)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:642)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 2
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class io.confluent.connect.avro.ConnectDefault specified in writer's schema whilst finding reader's schema for a SpecificRecord.
我试图发布主题中的键模式和值模式的新版本,以表名作为模式名,并删除具有\“name\”:\“connectDefault\”、\“namespace\”:\“io.confluent.connect.avro\”
属性的原始版本,但没有成功。我是缺少了一个名为ConnectDefault的类,还是可以在源连接器的某个地方指定一个没有命名空间的架构名称?
我的Kafka流配置:
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-app");
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
我的Kafka Connect配置:
name=source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:oracle:thin:
mode=incrementing
incrementing.column.name=id
query=QUERY
topic.prefix=topic-name
transforms=InsertKey, ExtractId
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=id
transforms.ExtractId.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.ExtractId.field=id
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schemas.enable=true
value.converter.schema.registry.url=http://localhost:8081
问题是,当jdbc源连接器处于查询模式时,模式名默认为null。https://github.com/confluentinc/kafka-connect-jdbc/issues/90
看起来这可以通过使用SetSchemaMetadata转换在源连接器中添加带有SMT(Single Message Transforms)的模式名称来解决。https://cwiki.apache.org/confluence/display/kafka/kip-66%3a+single+message+transforms+for+kafka+connect
transforms=setValueSchema
transforms.setValueSchema.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Value
transforms.setValueSchema.schema.name=io.confluent.connect.avro.ConnectDefault
我对Kafka connect很陌生。我想把我的信息从Kafka主题推到弹性搜索。在阅读了可用的文档之后,我从发行版tar下载并编译了弹性搜索接收器。拉链(https://github.com/confluentinc/kafka-connect-elasticsearch/releases) 我添加了弹性搜索属性文件,并将上述jar包含在类路径中。当我在独立模式下运行kafka connect时
使用此Kafka Connect连接器: https://www.confluent.io/hub/confluentinc/kafka-connect-s3 我手动将其安装到我的kafka Connect Docker映像的插件中。我的目的是使用Kafka Connect将来自Kafka主题的Avro记录写入S3。 在运行时,使用Kafka Connect,我会得到以下错误: 在ConFluen
我正在使用confluent JDBC连接器连接到postgres数据库,以检索更改并将其放在Kafka主题中。现在,我想使用spring boot消费者来使用这些消息。这些消息采用AVRO格式。我从连接器中获得了模式,并使用avro-maven插件为其生成了一个POJO类。 但是当侦听器启动时,只有以下错误 当我不使用avro对数据进行反序列化时,我会收到数据但不可读。 在pom中。xml我有以
我很难理解为什么会发生这种错误。我正在将教程移植到最新版本的Spring、Hibernate和WildFly。我从命令行运行,使用Maven构建和测试应用程序。我得到以下错误: 2015年7月10日下午2:18:03 org.springframework.test.context。TestContextManager prepareTestInstance SEVERE:允许TestExecut
我想连接两个主题流(左连接),并在连接的流上进行基于窗口的聚合。然而,聚合将某些消息计数两倍,因为在连接期间,根据正确主题中的延迟,某些消息将发出两倍。以下是POC的代码。 它是否可以修复以避免因连接而重复?
现在我正在尝试用kafka创建消息服务功能以使用< code > spring-cloud-stream-bind-Kafka ,但效果不太好。 Spring罩1.4.2 当我使用此错误日志启动项目时失败 我在怀疑我的春靴版本。这么低配的版本。< br >我认为< code > spring-cloud-stream-binder-Kafka 在spring boot 2.0版本下无法使用或者其他