我试图阅读和打印从Kafka使用Apache Flink的原型消息。
我遵循官方文件,但没有成功:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/third_party_serializers/
Flink消费者代码是:
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
env.enableCheckpointing(5000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setCheckpointStorage(s"$targetPath/checkpoints")
env.getConfig.registerTypeWithKryoSerializer(classOf[User], classOf[ProtobufSerializer])
val source = KafkaSource.builder[User]
.setBootstrapServers(brokers)
.setTopics(topic)
.setGroupId(consumerGroupId)
.setValueOnlyDeserializer(new ProtoDeserializer())
.setStartingOffsets(OffsetsInitializer.earliest)
.build
val stream = env.fromSource(source, WatermarkStrategy.noWatermarks[User], kafkaTableName)
stream.print()
env.execute()
}
反序列化器代码是:
class ProtoDeserializer extends DeserializationSchema[User] {
override def getProducedType: TypeInformation[User] = null
override def deserialize(message: Array[Byte]): User = User.parseFrom(message)
override def isEndOfStream(nextElement: User): Boolean = false
}
在执行拖缆时,我遇到以下错误:
Protocol message contained an invalid tag (zero).
值得一提的是,我使用confluent protobuf consumer成功地读取和反序列化了消息,因此消息似乎没有损坏。
汇合protobuf序列化程序不会生成可由其他反序列化程序直接反序列化的内容。confluent的文档中描述了该格式:它以一个魔术字节(始终为零)开始,后跟一个四字节的模式ID。protobuf负载随后从字节5开始。
getProducedType
方法应该返回适当的TypeInformation
,在本例中是TypeInformation。of(User.class)
。如果不这样做,您可能会在运行时遇到问题。
与KafkaSource
一起使用的反序列化程序不需要实现isEndOfStream
,但它不会造成任何影响。
我需要通过flink消费Kafka,不幸的是,Kafka消息是在serde中使用原型,完全不知道如何处理它,这里是来自互联网的代码,但我不能使它工作。 这不起作用,它会让NPE: 有人知道我做错了什么吗?使用twitter ProtobufSerializer是唯一值得拥有protobuf的方法吗?还是还有别的路要走?
Flink去/序列化操作员状态的频率是多少?每次获取/更新或基于检查点?状态后端有什么不同吗? 我怀疑,对于具有不同键(数百万)和每个键每秒数千个事件的键控流,去/序列化可能是一个大问题。我说得对吗?
主要目标是聚合两个Kafka主题,一个压缩慢速移动数据,另一个每秒接收一次的快速移动数据。 我已经能够在简单的场景中使用消息,例如KV(Long, String),使用如下内容: 但是,当您需要从 AVRO 反序列化时,这似乎不是方法。我有一个KV(字符串,AVRO),我需要消费。 我尝试从AVRO模式生成Java类,然后将它们包含在“应用”中,例如: 但这似乎不是正确的方法。 是否有任何文档/示
我试图通过Spring KafkaListener在单独的消费者应用程序中使用这些消息 集装箱工厂配置 在这种配置下,使用者不接收消息(字节)。如果我将Kafka侦听器更改为接受字符串,则会出现以下异常: 集装箱工厂配置 制片人-
我正在尝试使用kryo序列化和反序列化到二进制。我想我已经完成了序列化,但似乎无法反序列化。下面是我正在处理的代码,但最终我想存储一个字节[],然后再次读取它。文档只显示了如何使用文件。
我想反序列化表单中的类: 其中文本是加密的,反序列化应该在重建TestFieldEncryptedMessage实例之前取消对值的加密。 我采用的方法非常类似于:https://github.com/codesqueak/jackson-json-crypto 也就是说,我正在构建一个扩展SimpleModule的模块: 如您所见,设置了两个修饰符:EncryptedSerializerModif