当前位置: 首页 > 知识库问答 >
问题:

Kafka使用者解压缩gz文件流并读取

沃侯林
2023-03-14

Kafka producer正在发送.gz文件,但无法在消费者端解压缩和读取文件。获取错误为“IOError:不是gzipped文件”

producer-bin/kafka-console-producer.sh--broker-list localhost:9092-topic Airport<~/downloads/stocks.json.gz

消费者-

import sys 
import gzip
import StringIO
from kafka import KafkaConsumer

consumer = KafkaConsumer(KAFKA_TOPIC, bootstrap_servers=KAFKA_BROKERS)

try:
    for message in consumer:
        f = StringIO.StringIO(message.value)
        gzip_f = gzip.GzipFile(fileobj=f)
        unzipped_content = gzip_f.read()
        content = unzipped_content.decode('utf8')
        print (content)
except KeyboardInterrupt:
    sys.exit()

使用者出错-

Traceback (most recent call last):
  File "consumer.py", line 18, in <module>
    unzipped_content = gzip_f.read()
  File "/usr/lib64/python2.6/gzip.py", line 212, in read
    self._read(readsize)
  File "/usr/lib64/python2.6/gzip.py", line 255, in _read
    self._read_gzip_header()
  File "/usr/lib64/python2.6/gzip.py", line 156, in _read_gzip_header
    raise IOError, 'Not a gzipped file'
IOError: Not a gzipped file

共有1个答案

华星剑
2023-03-14

Kafaka并不是用来发送巨大的有效载荷/信息的。您应该将其视为分布式消息总线,它赋予您分布式系统的所有特权。

Kafaka限制可以跨行发送的信息的大小,原因如下

  • 巨大的消息会增加代理中的内存压力。
  • 较大的消息会降低代理的速度,并且处理这些消息的成本非常高。
    null

使用批处理大小:

batch.size以总字节数而不是消息数度量批处理大小。它控制在向Kafka代理发送消息之前要收集多少字节的数据。在不超过可用内存的情况下,将其设置得尽可能高。默认值为16384。

如果增加缓冲区的大小,它可能永远不会满。生产者最终会根据其他触发器发送信息,例如以毫秒为单位的延迟时间。虽然可以通过设置太高的缓冲区批处理大小来降低内存使用,但这不会影响延迟。

如果您的生产者一直在发送,那么您可能会获得尽可能好的吞吐量。如果生产者经常空闲,您可能没有写入足够的数据来保证当前的资源分配。

因为您的数据是gzip,所以您可以使用referenceBasedMessaging

而不是玩提取大小和消息最大字节大小(不能覆盖所有文件大小),将您的文件存储在分布式文件系统(如NFS/HDFS/S3)上,并将引用发送给用户。用户可以选择位置并解压数据。

 类似资料:
  • 问题内容: 这就是问题所在。我有sample.gz文件,大小约为60KB。我想解压缩该文件的前2000个字节。我遇到了CRC检查失败的错误,我猜是因为gzip CRC字段出现在文件末尾,并且它需要整个gzip压缩文件进行解压缩。有办法解决这个问题吗?我不在乎CRC检查。即使由于CRC错误而无法解压缩,也可以。有没有办法解决这个问题并解压缩部分.gz文件? 我到目前为止的代码是 遇到的错误是 还可以

  • 我正在使用Julia的ZipFile包来提取和处理csv文件。没问题,但是当我遇到zip文件中的zip文件时,我也想处理它,但是遇到了一个错误。 Julia ZipFile文档如下:https://zipfilejl.readthedocs.io/en/latest/ 对如何做到这一点有什么想法吗?

  • 问题内容: 在许多在线示例中,使用编码缓冲区在Java中对文件进行(解压缩)。但是,使用NIO时,无需选择合适的缓冲区大小。我找到了文件和套接字的示例,但是是否有NIO通道用于压缩输入(例如),因此您可以使用它代替自己创建缓冲区吗? 问题答案: 不,专业的ZIP频道尚不存在…我想您可以执行以下操作。使用NIO读取要缓冲的任何通道。然后检索刚刚从缓冲区读取的字节到字节数组,使用ByteArrayIn

  • 尝试下载. gz文件,将其解压缩到内存中,然后逐行读取解压缩后的内容。 现在,标题如下所示: {'Date':'Fri,23 Aug2019 07:19:28GMT','Server':'Apache','X-Content-Type-Options':'nosnff','X-Frame-Options':'samesource','Referre-Policy':'no-引用者','X-Xss-

  • 问题内容: 我想使用Java代码将文件压缩为zip,rar和7z格式。我也想在指定位置解压缩这些文件。谁能告诉我如何在Java中使用7-zip压缩和解压缩文件? 问题答案: 我用过:sevenzipjbinding.jar sevenzipjbinding-Allplatforms.jar 我现在可以使用这些jar解压缩文件。 尝试使用此链接进行解压缩:http : //sourceforge.n