当前位置: 首页 > 知识库问答 >
问题:

使用Java的Apache Flink中的通用协议缓冲区反序列化程序

梁兴文
2023-03-14

场景:阿帕奇·Flink、Kafka、协议缓冲区数据消费者。

数据源是协议缓冲区格式的Kafka主题(多个主题:主题#1,主题#3,主题#3)。消费者是Apache Flink消费者。每个主题都有一个独特的原型定义

List<String> topicList = Arrays.asList("topic#1,topic#2,topic#3".split(","));
inputStream = env.addSource(new FlinkKafkaConsumer[ProtobufDeserializationSchema](topicList, new ProtobufDeserializationSchema(), properties));

我试图在Apache Flink中开发一个通用的数据摄取工作,将Kafka的数据摄取到数据库中。

如何为Apache Flink实现通用protobuf反序列化程序?我正在寻找实现,将Kafka主题与protobuf定义联系起来进行反序列化。

最初的方法是将字节数组引入Flink数据流,然后根据Kafka主题名确定protobuf定义,以反序列化map函数中的消息。我怎样才能用通用的方式来做呢?

共有1个答案

景品
2023-03-14

flink statefun包含一个可能有用的通用protobuf(反)序列化程序。

https://github.com/apache/flink-statefun/blob/release-3.0/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/protobuf/ProtobufSerializer.java

 类似资料:
  • 这就是我想要实现的: > 在Proc#1中使用google协议缓冲区建模对象 使用proto-buf序列化该对象,并将其发送到posix消息队列。 在Proc#2中读取流并将其反序列化为类似的模型,同时使用协议缓冲区。 换句话说: 进程1中的对象-- 问题是Proc#1和Proc#2可能是完全不同的语言平台。程序#1通常是C与g相一致的。但是Proc#2可以是任何东西:Python、Java等等。

  • 我有kafka集群接收消息。消息是一个字节数组的zip文件。zip文件包含二进制的原型数据文件作为条目。我正在读取zip文件,并试图反序列化的原型条目,这就是我的代码是打异常。 在将二进制protobuf文件作为压缩字节数组发送到代理之前,我能够对其进行反序列化。 但是,当我压缩这些二进制protobuf文件,向kafka生成消息,使用它,然后尝试反序列化zip流中的条目时,我面临着一些问题。 我

  • 是否有可能在C中序列化一个类,并使用协议缓冲区将其反序列化为C#中的类似类?我已经尝试过Json序列化来克服不同平台中的序列化问题,但它在一些数据类型上存在问题,如数组列表等。那么关于使用谷歌协议缓冲区有什么建议吗?

  • 默认情况下,Dart-RPC在服务器和客户端之间传输对象(类实例)时使用JSON序列化。 如何使用Protobuf(协议缓冲区)序列化 是否可以使用“接受”请求标头指定序列化方法(如内容类型)? 这是我尝试的, 我使用了以下定义文件,表示实体: 生成了人。pb。dart对于我来说,使用protoc gen dart插件,通过运行以下命令: 还有一些样板dart rpc代码: 打开功能请求:http

  • 问题内容: 对于某些缓存,我正在考虑为即将到来的项目做准备,我一直在考虑Java序列化。即,应该使用它吗? 现在,由于几年来的各种原因,我以前已经编写了自定义序列化和反序列化(可外部化)。如今,互操作性已成为一个更大的问题,并且我可以预见需要与.Net应用程序进行交互,因此我考虑使用独立于平台的解决方案。 有没有人对GPB的高性能使用有任何经验?与Java的本机序列化相比,它在速度和效率方面有何不

  • 试图使用Ionic 4中的协议缓冲区进行编码 我已经下载了协议并用它来生成一堆_pb.js文件,每个. proto文件一个。很好。 首先关注原型示例。这是示例代码: 我做了一些更改以匹配我的文件。更改proto文件的名称。但是我的proto文件中没有包名称。所以我只是使用了消息名称。首先这是我的. proto文件的开头: 下面是我修改后的代码: 这似乎不起作用。我的控制台显示: 我相信我已经成功地