当前位置: 首页 > 知识库问答 >
问题:

用Flink反序列化Protobuf kafka消息

龙永思
2023-03-14

我试图阅读和打印从Kafka使用Apache Flink的原型消息。

我遵循官方文件,但没有成功:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/third_party_serializers/

Flink消费者代码是:

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
    env.enableCheckpointing(5000)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    env.getCheckpointConfig.setCheckpointStorage(s"$targetPath/checkpoints")
    env.getConfig.registerTypeWithKryoSerializer(classOf[User], classOf[ProtobufSerializer])

    val source = KafkaSource.builder[User]
      .setBootstrapServers(brokers)
      .setTopics(topic)
      .setGroupId(consumerGroupId)
      .setValueOnlyDeserializer(new ProtoDeserializer())
      .setStartingOffsets(OffsetsInitializer.earliest)
      .build

    val stream = env.fromSource(source, WatermarkStrategy.noWatermarks[User], kafkaTableName)
    stream.print()
    env.execute()
  }

反序列化器代码是:

class ProtoDeserializer extends DeserializationSchema[User] {

  override def getProducedType: TypeInformation[User] = null

  override def deserialize(message: Array[Byte]): User = User.parseFrom(message)

  override def isEndOfStream(nextElement: User): Boolean = false
}

在执行拖缆时,我遇到以下错误:

Protocol message contained an invalid tag (zero).

值得一提的是,我使用confluent protobuf consumer成功地读取和反序列化了消息,因此消息似乎没有损坏。

共有1个答案

孔山
2023-03-14

汇合protobuf序列化程序不会生成可由其他反序列化程序直接反序列化的内容。confluent的文档中描述了该格式:它以一个魔术字节(始终为零)开始,后跟一个四字节的模式ID。protobuf负载随后从字节5开始。

getProducedType方法应该返回适当的TypeInformation,在本例中是TypeInformation。of(User.class)。如果不这样做,您可能会在运行时遇到问题。

KafkaSource一起使用的反序列化程序不需要实现isEndOfStream,但它不会造成任何影响。

 类似资料:
  • 我需要通过flink消费Kafka,不幸的是,Kafka消息是在serde中使用原型,完全不知道如何处理它,这里是来自互联网的代码,但我不能使它工作。 这不起作用,它会让NPE: 有人知道我做错了什么吗?使用twitter ProtobufSerializer是唯一值得拥有protobuf的方法吗?还是还有别的路要走?

  • Flink去/序列化操作员状态的频率是多少?每次获取/更新或基于检查点?状态后端有什么不同吗? 我怀疑,对于具有不同键(数百万)和每个键每秒数千个事件的键控流,去/序列化可能是一个大问题。我说得对吗?

  • 主要目标是聚合两个Kafka主题,一个压缩慢速移动数据,另一个每秒接收一次的快速移动数据。 我已经能够在简单的场景中使用消息,例如KV(Long, String),使用如下内容: 但是,当您需要从 AVRO 反序列化时,这似乎不是方法。我有一个KV(字符串,AVRO),我需要消费。 我尝试从AVRO模式生成Java类,然后将它们包含在“应用”中,例如: 但这似乎不是正确的方法。 是否有任何文档/示

  • 我试图通过Spring KafkaListener在单独的消费者应用程序中使用这些消息 集装箱工厂配置 在这种配置下,使用者不接收消息(字节)。如果我将Kafka侦听器更改为接受字符串,则会出现以下异常: 集装箱工厂配置 制片人-

  • 我正在尝试使用kryo序列化和反序列化到二进制。我想我已经完成了序列化,但似乎无法反序列化。下面是我正在处理的代码,但最终我想存储一个字节[],然后再次读取它。文档只显示了如何使用文件。

  • 我想反序列化表单中的类: 其中文本是加密的,反序列化应该在重建TestFieldEncryptedMessage实例之前取消对值的加密。 我采用的方法非常类似于:https://github.com/codesqueak/jackson-json-crypto 也就是说,我正在构建一个扩展SimpleModule的模块: 如您所见,设置了两个修饰符:EncryptedSerializerModif