当前位置: 首页 > 知识库问答 >
问题:

jdbc kafka连接avro模式与流不兼容

利俊迈
2023-03-14

我正在使用jdbc源代码连接器从oracle db获取数据,并按下(两个键

Value schema:
{
"subject": "testtopicA-value",
"version": 1,
"id": 1122,
"schema": "{\"type\":\"record\",\"name\":\"ConnectDefault\",\"namespace\":\"io.confluent.connect.avro\",\"fields\":[{\"name\":\"ID\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"TIME\",\"type\":{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}},{\"name\":\"STATUS\",\"type\":\"string\"},{\"name\":\"NUMBER\",\"type\":[\"null\",\"int\"],\"default\":null}]}"
}

Key Schema :
{
"subject": "testtopicA-key",
"version": 1,
"id": 1123,
"schema": "[\"null\",\"long\"]"
}

我有一个Kafka流收听这个主题,并有avro Genericrecord。当我启动流时,我开始得到<code>ClassCastException:java.lang.Long不能强制转换为org.apache.avro.generic。GenericRecordconnect生成的架构具有数据类型为“long”的字段

        final StreamsBuilder builder = new StreamsBuilder();
    KStream<?,?> stream;
    stream = builder.stream(INPUTTOPIC);        
    
    ((KStream<GenericRecord,GenericRecord>) stream)
    
    .filter((k,v) -> v != null)
    .map((k,v)->
    {
        .........
        .......
    

有人对如何解决这个问题有什么建议吗?融合版本:6.2.0

共有1个答案

沈龙光
2023-03-14

根据错误,您没有将默认键/值serde设置为Avro;你把它当成了朗塞德

 类似资料:
  • 使用汇流5.4.1 在将连接的流从连接的KTable转发到另一个主题时,我们碰巧在新的KTable外键连接中遇到了一个问题。 将错误中提到的模式与在模式注册表上注册的模式进行比较,结果完全相同…似乎Kafka 2.4.0中已经出现了类似的问题:https://issues.apache.org/jira/browse/Kafka-9390并且该问题在转发到另一个主题时仍然存在

  • 我有一个源连接器,可以推送到新创建的主题。“key.converter”和“value.converter”未配置,因此转换是使用Avro完成的。 当Kafka Connect填充此主题时,它会在注册表中创建一个主题名为“主题名键”和“主题名值”的模式。例如,此模式可能具有如下配置的字段 但当记录被拉入流媒体应用程序时,Kafka的代码(Kafka流:3.0.1)有一个模式伴随着数据。有时(并非总

  • 我有一个avro模式定义,比如- 上线后,我们增加了另一个领域- CusterType被定义为null, string。即使在向合流注册表注册架构时,我们也会收到错误-正在注册的架构与早期架构不兼容。 如果有什么原因,请告诉我们。我们通过显式地将customerType默认为null来解决这个问题, Union{null, string}CusterType=null; 但不知何故,我觉得这不是必

  • 我正在构建一个电子商务应用程序,我目前正在处理两个数据馈送:订单执行和销售中断。由于各种原因,销售失败将是无效的执行。失败的销售将具有与订单相同的订单编号,因此连接位于订单编号和行项目编号上。 目前,我有两个主题-,和。两者都是使用Avro模式定义的,并使用SpecificRecord构建的。键是。 订单的字段:订单编号,时间戳,订单行,项目编号,数量 的字段: 通过运行 我需要将与左连接,并在输

  • 我一直在尝试将avro通用记录进行串行化,并生成avro串行化的数据发送给Kafka。主要目标是不使用合并模式注册表存储模式,而是将模式与序列化数据一起发送,以便从kafka主题中提取并反序列化。 下面是AvroSerializer用于生成Avro数据的部分。 Kafka中出现的序列化数据如下所示。

  • 我使用https://github . com/confluent Inc/confluent-Kafka-python/blob/master/examples/avro _ producer . py中的示例代码将数据加载到主题中。我只做了一个更改,那就是我添加了“default”:为了模式兼容性,每个字段都为null。它加载得很好,因为我可以在http://localhost:9021/中看