我为记录创建了一个Avro模式,我们将其发布到Kafka主题中。我们实际的Kafka记录模式更复杂,但为了简洁起见,我只附上了相关部分。我们在记录中有多个嵌套子类,但由于某些原因,我在尝试发布记录时遇到以下异常(包名称已被掩盖):
java.lang.ClassCastException: aaa.bbb.ccc.ddd.Amount can not cast to org.apache.avro.generic.IndexedRecord
class KafkaRecord {
private Amount amount;
class Amount {
String currency;
long value;
}
}
这是我定义的Avro模式的当前子集。
{
"type" : "record",
"name" : "KafkaRecord",
"namespace" : "com.company.department",
"fields" : [ {
"name" : "amount",
"type" : {
"type" : "record",
"name" : "Amount",
"namespace" : "aaa.bbb.ccc.ddd",
"fields" : [ {
"name" : "value",
"type" : "long"
}, {
"name" : "currency",
"type" : "string"
} ]
}
}
}
我们的对象(KafkaRecord)的JSON表示如下:
{
"amount": {
"currency": "GBP",
"value": 12345
}
}
我似乎不明白为什么Avro不喜欢这个嵌套的记录,我更喜欢不剥离这些嵌套的类,因为这会使JSON记录非常难以读取和管理类。
如果有人能指出我在这里做错了什么,那就太好了!
那么,您不需要编写自己的Java文件。听起来像是你做的,所以错误就像它所说的那样-Amount
类不是IndexedBook
例如,如果我使用您的模式并运行
java -jar ~/Downloads/avro-tools-1.8.2.jar compile schema KafkaRecord.avsc .
然后看文件,我们看到它扩展了一些Avro java类。
$ head -n 15 aaa/bbb/ccc/ddd/Amount.java
/**
* Autogenerated by Avro
*
* DO NOT EDIT DIRECTLY
*/
package aaa.bbb.ccc.ddd;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.message.BinaryMessageEncoder;
import org.apache.avro.message.BinaryMessageDecoder;
import org.apache.avro.message.SchemaStore;
@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class Amount extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
Avro Maven插件指南中记录了以编程方式执行此操作的方法。
是的,Avro可以很好地处理嵌套记录,我个人喜欢使用IDL来更轻松地创建它们
@namespace("com.company.department")
protocol KafkaEventProtocol {
@namespace("aaa.bbb.ccc.ddd")
record Amount {
string currency;
long value;
}
record KafkaValue {
Amount amount;
}
}
主要内容:Spark是什么?,与Spark整合在本章中,将讨论如何将Apache Kafka与Spark Streaming API集成。 Spark是什么? Spark Streaming API支持实时数据流的可扩展,高吞吐量,容错流处理。 数据可以从Kafka,Flume,Twitter等许多来源获取,并且可以使用复杂算法进行处理,例如:映射,缩小,连接和窗口等高级功能。 最后,处理后的数据可以推送到文件系统,数据库和现场仪表板上。 弹
主要内容:Storm是什么?,与Storm整合,提交到拓扑在本章中,我们将学习如何将Kafka与Apache Storm集成。 Storm是什么? Storm最初是由Nathan Marz和BackType团队创建的。 在很短的时间内,Apache Storm成为分布式实时处理系统的标准,用于处理大数据。 Storm速度非常快,每个节点每秒处理超过一百万个元组的基准时钟。 Apache Storm持续运行,从配置的源(Spouts)中消耗数据并将数据传递
我正在尝试使用CockroachDB (v2.0.6)作为我的一个Kafka主题的接收器。 我找不到任何专门用于CockroachDB的Kafka连接器,所以我决定使用Confluent的jdbc sink连接器,因为CockroachDB支持postgreSQL语法。 我在Kafka Connect上使用的连接字符串如下 这基本上是我在现有工作的Postgres接收器连接器上所做的唯一更改。 不
我试图将Kafka偏移量保存到文件中,我使用Spring Boot,似乎偏移量在文件中写入,但没有读取,所以事实上骆驼将在重新启动时从Kafka主题的开头开始读取
我有以下:Source-Kafka topic(trans)Channel-memory Sink-Hdfs(avro _ event) kafka主题trans中的数据是使用c#生产者编写的,并且有数千条avro记录。当我运行我的水槽消费者时,它开始将数据下沉到hdfs。问题是数据的格式是:模式数据模式数据 而不是: 模式数据数据 我猜这是因为flume需要一个带有{header} {body}
我试图通过docker-compose文件部署kafka,或者通过安装kafka映像并手动运行它。当我开始运行kafka服务器(代理)时,这两个步骤都给我带来了错误 INFO初始化客户端连接,connectString=188.226.151.167:2181 sessiontimeout=6000 watcher=org.i0itec.zkclient.zkclient@323b36e0(org