Kafka producer正在发送.gz文件,但无法在消费者端解压缩和读取文件。获取错误为“IOError:不是gzipped文件”
producer-bin/kafka-console-producer.sh--broker-list localhost:9092-topic Airport<~/downloads/stocks.json.gz
消费者-
import sys
import gzip
import StringIO
from kafka import KafkaConsumer
consumer = KafkaConsumer(KAFKA_TOPIC, bootstrap_servers=KAFKA_BROKERS)
try:
for message in consumer:
f = StringIO.StringIO(message.value)
gzip_f = gzip.GzipFile(fileobj=f)
unzipped_content = gzip_f.read()
content = unzipped_content.decode('utf8')
print (content)
except KeyboardInterrupt:
sys.exit()
使用者出错-
Traceback (most recent call last):
File "consumer.py", line 18, in <module>
unzipped_content = gzip_f.read()
File "/usr/lib64/python2.6/gzip.py", line 212, in read
self._read(readsize)
File "/usr/lib64/python2.6/gzip.py", line 255, in _read
self._read_gzip_header()
File "/usr/lib64/python2.6/gzip.py", line 156, in _read_gzip_header
raise IOError, 'Not a gzipped file'
IOError: Not a gzipped file
Kafaka并不是用来发送巨大的有效载荷/信息的。您应该将其视为分布式消息总线,它赋予您分布式系统的所有特权。
Kafaka限制可以跨行发送的信息的大小,原因如下
使用批处理大小:
batch.size
以总字节数而不是消息数度量批处理大小。它控制在向Kafka代理发送消息之前要收集多少字节的数据。在不超过可用内存的情况下,将其设置得尽可能高。默认值为16384。
如果增加缓冲区的大小,它可能永远不会满。生产者最终会根据其他触发器发送信息,例如以毫秒为单位的延迟时间。虽然可以通过设置太高的缓冲区批处理大小来降低内存使用,但这不会影响延迟。
如果您的生产者一直在发送,那么您可能会获得尽可能好的吞吐量。如果生产者经常空闲,您可能没有写入足够的数据来保证当前的资源分配。
因为您的数据是gzip
,所以您可以使用referenceBasedMessaging
。
而不是玩提取大小和消息最大字节大小(不能覆盖所有文件大小),将您的文件存储在分布式文件系统(如NFS/HDFS/S3)上,并将引用发送给用户。用户可以选择位置并解压数据。
问题内容: 这就是问题所在。我有sample.gz文件,大小约为60KB。我想解压缩该文件的前2000个字节。我遇到了CRC检查失败的错误,我猜是因为gzip CRC字段出现在文件末尾,并且它需要整个gzip压缩文件进行解压缩。有办法解决这个问题吗?我不在乎CRC检查。即使由于CRC错误而无法解压缩,也可以。有没有办法解决这个问题并解压缩部分.gz文件? 我到目前为止的代码是 遇到的错误是 还可以
我想使用jszip加载一个压缩包,获取里面的docx文件,返回一个文件地址或者xml,给wps加载项用,本来是想试一试blob是否可以,但是打出来的日志返回的是一个Blob {size: 10372, type: ''},async里面的type写为string的时候,打出来的日志是个乱码 想知道一下这个该怎么办,wps加载项需要是一个docx地址,或者是一个xml文件
尝试下载. gz文件,将其解压缩到内存中,然后逐行读取解压缩后的内容。 现在,标题如下所示: {'Date':'Fri,23 Aug2019 07:19:28GMT','Server':'Apache','X-Content-Type-Options':'nosnff','X-Frame-Options':'samesource','Referre-Policy':'no-引用者','X-Xss-
问题内容: 在许多在线示例中,使用编码缓冲区在Java中对文件进行(解压缩)。但是,使用NIO时,无需选择合适的缓冲区大小。我找到了文件和套接字的示例,但是是否有NIO通道用于压缩输入(例如),因此您可以使用它代替自己创建缓冲区吗? 问题答案: 不,专业的ZIP频道尚不存在…我想您可以执行以下操作。使用NIO读取要缓冲的任何通道。然后检索刚刚从缓冲区读取的字节到字节数组,使用ByteArrayIn
我正在使用Julia的ZipFile包来提取和处理csv文件。没问题,但是当我遇到zip文件中的zip文件时,我也想处理它,但是遇到了一个错误。 Julia ZipFile文档如下:https://zipfilejl.readthedocs.io/en/latest/ 对如何做到这一点有什么想法吗?