当前位置: 首页 > 面试题库 >

适用于AVRO基本类型的Serde类

段铭晨
2023-03-14
问题内容

我正在用Java编写一个Kafka流应用程序,该应用程序将接受由连接器创建的输入主题,该连接器将模式注册表和avro用于键和值转换器。连接器产生以下架构

key-schema: "int"
value-schema:{
"type": "record",
"name": "User",
"fields": [
    {"name": "firstname", "type": "string"},
    {"name": "lastname",  "type": "string"}
]}

实际上,有几个主题,键模式始终是“ int”,而值模式始终是某种记录(用户,产品等)。我的代码包含以下定义

Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", schemaRegistryUrl);

Serde<User> userSerde = new SpecificAvroSerde<>();
userSerde.configure(serdeConfig, false);

最初,我尝试使用类似的内容来使用该主题, Consumed.with(Serdes.Integer(), userSerde);但由于Serdes.Integer()期望使用4个字节对整数进行编码,但avro使用可变长度编码,因此无法使用。使用Consumed.with(Serdes.Bytes(), userSerde);工作,但我真的想要int而不是字节,所以我将代码更改为此

KafkaAvroDeserializer keyDeserializer = new KafkaAvroDeserializer()
KafkaAvroSerializer keySerializer = new KafkaAvroSerializer();
keyDeserializer.configure(serdeConfig, true); 
keySerializer.configure(serdeConfig, true);
Serde<Integer> keySerde = (Serde<Integer>)(Serde)Serdes.serdeFrom(keySerializer, keyDeserializer);

这使编译器产生警告(它不喜欢(Serde<Integer>)(Serde)强制转换),但允许我使用

Consumed.with(keySerde, userSerde);并获得一个整数作为关键字。这工作得很好,我的应用程序表现出预期(很棒!!!)。但是现在我想为键/值定义默认Serde,但我无法使其正常工作。

设置默认值serde很简单:

streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);

但是我不知道如何定义默认的密钥序列。

我试过了

1.streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerde.getClass().getName()); 产生运行时错误:找不到org.apache.kafka.common.serialization.Serdes $ WrapperSerde的公共无参数构造函数
2. streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); 产生运行时错误:无法将java.lang.Integer强制转换为org.apache.avro.specific.SpecificRecord
我想念什么?谢谢。


问题答案:

更新 (5.5及更高版本)

Confluent版本5.5通过PrimitiveAvroSerde(cf. https://github.com/confluentinc/schema-registry/blob/5.5.x/avro-serde/src/main/java/io/confluent/kafka/streams添加了对原始Avro类型的本地支持/serdes/avro/PrimitiveAvroSerde.java)

原始答案 (版本5.4及更高版本):

这是一个已知问题。原始Avro类型无法与Confluent的AvroSerdes配合使用,因为Serdes只能与GenericAvroRecord和一起SpecificAvroRecord使用。

比较https://github.com/confluentinc/schema-registry/tree/master/avro-serde/src/main/java/io/confluent/kafka/streams/serdes/avro。

因此,基于KafkaAvroSerializer和构建您自己的SerdeKafkaAvroDeserializer是正确的方法。为了能够将其作为默认Serde传递到配置中,您不能使用,Serdes.serdeFrom因为由于泛型类型擦除而导致类型信息丢失。

但是,您可以实现自己的扩展Serde接口的类,并将自定义类传递给config:

public class MySerde extends Serde<Integer> {
    // use KafkaAvroSerializer and KafkaAvroDeserializer and cast `Object` to `Integer`
}

config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, MySerde.class);


 类似资料:
  • 我正在尝试将数据库中列表中的项作为行中的单个值插入。如何循环发送的有效负载并将所有项插入到单独的行中?我试着找例子,没有运气。我无法循环并插入数据库中的值。 这是有效载荷

  • 我有两门课。我的基层: 还有我的另一门课。这个类扩展了BaseRequest类。 当我尝试创建映射器将我的Add Class转换为其他类时: 当我运行我的构建时,我收到了这个错误: 错误:(22,13)java:源参数中不存在名为“dateTransaction”的属性。你是说“空”吗?

  • 问题内容: 这是我的柱塞: http://plnkr.co/edit/oIei6gAU1Bxpo8VUIswt 单击该按钮后,应在“ Hello World!”之前插入以下内容。跨度: 减去脚本标签,当然。 我是通过动态插入以下div来实现的: 然后进行编译。但是,如果您查看日志,则编译后剩下的唯一内容是: 这里发生了什么?为什么我的插入内容无法正确编译?逻辑如下: 问题答案: 在 快速 的答案可

  •  tjsTypes.h で定義されているプリミティブ型がいくつかあります。 tjs_int 符号あり整数(最低32bit) tjs_uint 符号なし整数(最低32bit) tjs_int8 8bitの符号あり整数 tjs_uint8 8bitの符号なし整数 tjs_int16 16bitの符号あり整数 tjs_uint16 16bitの符号なし整数 tjs_int32 32bitの符号あり整数

  • ECMAScript 变量可能包含两种不同数据类型的值:基本类型值和引用类型值。基本类型值指的是简单的数据段,而引用类型值指那些可能由多个值构成的对象。 在将一个值赋给变量时,解析器必须确定这个值是基本类型值还是引用类型值。第3 章讨论了5 种基本数据类型:Undefined、Null、Boolean、Number 和String。这5 种基本数据类型是按值访问的,因为可以操作保存在变量中的实际的

  • 问题内容: 在JavaScript中,每个对象同时是一个实例和一个类。要进行继承,可以将任何对象实例用作原型。 在Python,C ++等中,有类和实例作为单独的概念。为了进行继承,您必须使用基类创建一个新类,然后可以使用该新类来生成派生实例。 为什么JavaScript朝这个方向发展(基于原型的面向对象)?与传统的基于类的OO相比,基于原型的OO有哪些优点和缺点? 问题答案: 这里大约有一百个术