当前位置: 首页 > 知识库问答 >
问题:

Avro Schema与Kafka,ClassCastException?

仲孙鸣
2023-03-14

我为记录创建了一个Avro模式,我们将其发布到Kafka主题中。我们实际的Kafka记录模式更复杂,但为了简洁起见,我只附上了相关部分。我们在记录中有多个嵌套子类,但由于某些原因,我在尝试发布记录时遇到以下异常(包名称已被掩盖):

java.lang.ClassCastException: aaa.bbb.ccc.ddd.Amount can not cast to org.apache.avro.generic.IndexedRecord

class KafkaRecord {

    private Amount amount;

    class Amount {

        String currency;
        long value;

    }

}

这是我定义的Avro模式的当前子集。


{
  "type" : "record",
  "name" : "KafkaRecord",
  "namespace" : "com.company.department",
  "fields" : [ {
    "name" : "amount",
    "type" : {
      "type" : "record",
      "name" : "Amount",
      "namespace" : "aaa.bbb.ccc.ddd",
      "fields" : [ {
        "name" : "value",
        "type" : "long"
      }, {
        "name" : "currency",
        "type" : "string"
      } ]
    }
  }
}

我们的对象(KafkaRecord)的JSON表示如下:

{
  "amount": {
    "currency": "GBP",
    "value": 12345
  }
}

我似乎不明白为什么Avro不喜欢这个嵌套的记录,我更喜欢不剥离这些嵌套的类,因为这会使JSON记录非常难以读取和管理类。

如果有人能指出我在这里做错了什么,那就太好了!

共有1个答案

上官高逸
2023-03-14

那么,您不需要编写自己的Java文件。听起来像是你做的,所以错误就像它所说的那样-Amount类不是IndexedBook

例如,如果我使用您的模式并运行

java -jar ~/Downloads/avro-tools-1.8.2.jar compile schema KafkaRecord.avsc .

然后看文件,我们看到它扩展了一些Avro java类。

$ head -n 15 aaa/bbb/ccc/ddd/Amount.java
/**
 * Autogenerated by Avro
 *
 * DO NOT EDIT DIRECTLY
 */
package aaa.bbb.ccc.ddd;

import org.apache.avro.specific.SpecificData;
import org.apache.avro.message.BinaryMessageEncoder;
import org.apache.avro.message.BinaryMessageDecoder;
import org.apache.avro.message.SchemaStore;

@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class Amount extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {

Avro Maven插件指南中记录了以编程方式执行此操作的方法。

是的,Avro可以很好地处理嵌套记录,我个人喜欢使用IDL来更轻松地创建它们

@namespace("com.company.department")
protocol KafkaEventProtocol {
  @namespace("aaa.bbb.ccc.ddd")
  record Amount {
    string currency;
    long value;
  }

  record KafkaValue {
    Amount amount;
  }
}
 类似资料:
  • 主要内容:Spark是什么?,与Spark整合在本章中,将讨论如何将Apache Kafka与Spark Streaming API集成。 Spark是什么? Spark Streaming API支持实时数据流的可扩展,高吞吐量,容错流处理。 数据可以从Kafka,Flume,Twitter等许多来源获取,并且可以使用复杂算法进行处理,例如:映射,缩小,连接和窗口等高级功能。 最后,处理后的数据可以推送到文件系统,数据库和现场仪表板上。 弹

  • 主要内容:Storm是什么?,与Storm整合,提交到拓扑在本章中,我们将学习如何将Kafka与Apache Storm集成。 Storm是什么? Storm最初是由Nathan Marz和BackType团队创建的。 在很短的时间内,Apache Storm成为分布式实时处理系统的标准,用于处理大数据。 Storm速度非常快,每个节点每秒处理超过一百万个元组的基准时钟。 Apache Storm持续运行,从配置的源(Spouts)中消耗数据并将数据传递

  • 我正在尝试使用CockroachDB (v2.0.6)作为我的一个Kafka主题的接收器。 我找不到任何专门用于CockroachDB的Kafka连接器,所以我决定使用Confluent的jdbc sink连接器,因为CockroachDB支持postgreSQL语法。 我在Kafka Connect上使用的连接字符串如下 这基本上是我在现有工作的Postgres接收器连接器上所做的唯一更改。 不

  • 我试图将Kafka偏移量保存到文件中,我使用Spring Boot,似乎偏移量在文件中写入,但没有读取,所以事实上骆驼将在重新启动时从Kafka主题的开头开始读取

  • 我有以下:Source-Kafka topic(trans)Channel-memory Sink-Hdfs(avro _ event) kafka主题trans中的数据是使用c#生产者编写的,并且有数千条avro记录。当我运行我的水槽消费者时,它开始将数据下沉到hdfs。问题是数据的格式是:模式数据模式数据 而不是: 模式数据数据 我猜这是因为flume需要一个带有{header} {body}

  • 我试图通过docker-compose文件部署kafka,或者通过安装kafka映像并手动运行它。当我开始运行kafka服务器(代理)时,这两个步骤都给我带来了错误 INFO初始化客户端连接,connectString=188.226.151.167:2181 sessiontimeout=6000 watcher=org.i0itec.zkclient.zkclient@323b36e0(org