当前位置: 首页 > 知识库问答 >
问题:

无法反序列化压缩协议缓冲区

陶腾
2023-03-14

我有kafka集群接收消息。消息是一个字节数组的zip文件。zip文件包含二进制的原型数据文件作为条目。我正在读取zip文件,并试图反序列化的原型条目,这就是我的代码是打协议消息无效的UTF-8,无效的标签异常。

在将二进制protobuf文件作为压缩字节数组发送到代理之前,我能够对其进行反序列化。

但是,当我压缩这些二进制protobuf文件,向kafka生成消息,使用它,然后尝试反序列化zip流中的条目时,我面临着一些问题。

我不确定谁是罪魁祸首。

因为这些二进制协议缓冲区是gzip压缩的,所以再次压缩它们会把事情搞砸吗?

有人能照亮吗?

谢啦

**************编辑**************

Producer Side:

public byte[] getZipfileBytes() {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        ZipOutputStream zipOut = new ZipOutputStream(baos);
        CheckedOutputStream checkSum = new CheckedOutputStream(zipOut, new Adler32());

        try {
            ZipEntry zipEntry = new ZipEntry(testFile.getName());
            byte[] protoBytes = IOUtils.toByteArray(new FileInputStream(testFile));
            System.out.println("bytes length:\t"+protoBytes.length);
            zipEntry.setSize(protoBytes.length);
            zipOut.putNextEntry(zipEntry);
            zipOut.write(protoBytes);
            zipOut.close();
            System.out.println("checksum:"+checkSum.getChecksum().getValue());
            zipBytes = baos.toByteArray();
        } catch (IOException e) {
            e.printStackTrace();
        }

        return zipBytes;
    }



    Consumer Side:
         processConsumerRecord(ConsumerRecord<String, byte[]> record) {
                String key = record.key();
                byte[] dataPacket = record.value();

                ZipInputStream zipIn = new ZipInputStream(new ByteArrayInputStream(dataPacket));

                CheckedInputStream checkSum = new CheckedInputStream(zipIn,
                        new Adler32());
                ZipEntry zipEntry;
                try {
                    zipEntry = zipIn.getNextEntry();
                    while (zipEntry != null) {
                        String name = zipEntry.getName();
                        ByteArrayOutputStream baos = new ByteArrayOutputStream();
                            try {
                                IOUtils.copy(zipIn, baos);
                                byte[] protoBytes = baos.toByteArray();

二进制协议字节是gzip的,所以我需要Gunzip

如果我做gunzip,它不会以gzip格式抛出。

如果我跳过Gunzip并执行parsefrom,我会得到无效的标签异常。

   GZIPInputStream gzip = new GZIPInputStream(
                        new ByteArrayInputStream(baos.toByteArray()));
                        MyProtoObject mpo = null;
                        try {
                            mpo = MyProtoObject.parseFrom(protoBytes);
                        } catch (InvalidProtocolBufferException e1) {
                            e1.printStackTrace();
                        }
                    } catch (IOException e1) {
                        e1.printStackTrace();
                    }

校验和。getChecksum()。getValue()在生成和使用zip字节数组时返回1
以下是调试期间zipEntry变量的值:

    producer 
        zipEntry    ZipEntry  (id=44)   
        comment null    
        crc 2147247736  
        csize   86794   
        extra   null    
        flag    2056    
        method  8   
        name    "test.dat" (id=49)  
        size    92931   
        time    1214084891  


    consumer
        zipEntry    ZipEntry  (id=34)   
        comment null    
        crc 2147247736  
        csize   86794   
        extra   null    
        flag    0   
        method  8   
        name    "test.dat" (id=39)  
        size    92931   
        time    1214084891  

我甚至用另一种方法进行了测试,我没有在内存中处理原始字节,而是将zip文件写入磁盘,通过winzip手动提取,然后反序列化提取的二进制原始文件,成功了!!!

我是不是拉错了,让我知道

共有1个答案

高修筠
2023-03-14

这里有两件不同的事情:压缩/解压和处理protobuf。听起来这里的问题是第一个问题,听起来好像是破坏了protobuf数据。所以,现在:忘掉protobuf,只专注于压缩/解压。记录原始消息是什么(在压缩之前——可能是二进制文件或base-64块)。现在在接收端,在解压后跟踪二进制文件(同样是二进制文件或base-64块)中的内容。如果它们不是100%完全相同,那么所有其他赌注都是无效的。在成功复制原始二进制文件之前,protobuf是不可能的。

如果这是问题:最好显示您的邮政编码,这样我们就可以看到它。

如果您正确地压缩/解压缩二进制文件,那么问题将出在您的原型代码中。

如果问题是这样的:最好显示您的序列化/反序列化代码,这样我们就可以看到它了。

 类似资料:
  • 这就是我想要实现的: > 在Proc#1中使用google协议缓冲区建模对象 使用proto-buf序列化该对象,并将其发送到posix消息队列。 在Proc#2中读取流并将其反序列化为类似的模型,同时使用协议缓冲区。 换句话说: 进程1中的对象-- 问题是Proc#1和Proc#2可能是完全不同的语言平台。程序#1通常是C与g相一致的。但是Proc#2可以是任何东西:Python、Java等等。

  • 是否有可能在C中序列化一个类,并使用协议缓冲区将其反序列化为C#中的类似类?我已经尝试过Json序列化来克服不同平台中的序列化问题,但它在一些数据类型上存在问题,如数组列表等。那么关于使用谷歌协议缓冲区有什么建议吗?

  • 问题内容: 对于某些缓存,我正在考虑为即将到来的项目做准备,我一直在考虑Java序列化。即,应该使用它吗? 现在,由于几年来的各种原因,我以前已经编写了自定义序列化和反序列化(可外部化)。如今,互操作性已成为一个更大的问题,并且我可以预见需要与.Net应用程序进行交互,因此我考虑使用独立于平台的解决方案。 有没有人对GPB的高性能使用有任何经验?与Java的本机序列化相比,它在速度和效率方面有何不

  • 场景:阿帕奇·Flink、Kafka、协议缓冲区数据消费者。 数据源是协议缓冲区格式的Kafka主题(多个主题:主题#1,主题#3,主题#3)。消费者是Apache Flink消费者。每个主题都有一个独特的原型定义。 我试图在Apache Flink中开发一个通用的数据摄取工作,将Kafka的数据摄取到数据库中。 如何为Apache Flink实现通用protobuf反序列化程序?我正在寻找实现,

  • 我正在使用以下代码 我得到了这个错误,我不明白为什么我得到了它(是的,我得到了三次): libProbuf ERROR google/Probuf/wire_format.cc:1059]在序列化协议缓冲区时遇到包含无效UTF-8数据的字符串。字符串必须仅包含UTF-8;对原始字节使用'bytes'类型。 libProbuf ERROR google/Probuf/wire_format.cc:1

  • 默认情况下,Dart-RPC在服务器和客户端之间传输对象(类实例)时使用JSON序列化。 如何使用Protobuf(协议缓冲区)序列化 是否可以使用“接受”请求标头指定序列化方法(如内容类型)? 这是我尝试的, 我使用了以下定义文件,表示实体: 生成了人。pb。dart对于我来说,使用protoc gen dart插件,通过运行以下命令: 还有一些样板dart rpc代码: 打开功能请求:http