当前位置: 首页 > 知识库问答 >
问题:

获取Kafka压缩消息大小

韦辰钊
2023-03-14

我想知道Kafka中信息的压缩大小。

我使用kafka 1.1.0和java kafka connect 1.1.0从我的制作人向主题发送消息。

如果消息对我的制作人来说太大,我会得到一个

消息序列化时为xxx字节,大于使用max.request配置的最大请求大小。大小配置。

设置最大请求。将大小设置为合适的值将导致来自代理的错误消息作为消息。代理配置中的max.bytes也必须相应地进行调整。不幸的是,错误消息没有包括代理收到的消息的大小。我调整了消息。最大字节数。到现在为止,一直都还不错。

如果我在生产者端激活压缩,最大请求。大小仍然必须与未压缩的大小相同,因为在压缩之前,代码会比较未压缩消息的大小(请参阅)https://issues.apache.org/jira/browse/KAFKA-4169)

但通过压缩,我可以减少信息量。代理中的最大字节数。问题是,在任何时候我都无法确定这个压缩消息的大小。有没有办法在发送消息之前在生产商代码中或稍后在日志文件中找到答案?

在我的压缩示例中,消息的默认值为1MB。max.bytes就足够了,所以我不必更改默认配置。但我想知道我的压缩信息是远低于1MB还是仅仅0.99MB。在这种情况下,我可能会增加消息。生产中的最大字节数,以避免出现问题。

提前感谢您的支持。

共有2个答案

严狐若
2023-03-14

为了测试snappy压缩消息,您可以执行以下操作。

pip install python-snappy
python -m snappy -c input.json output.snappy
阮鸿煊
2023-03-14

你可以使用压缩库,自己压缩信息,在发送之前检查大小。例如,假设您使用的是lz4压缩,那么您可以使用lz4 java lib,然后使用类似于:

private static LZ4Compressor COMPRESS = LZ4Factory.fastestInstance().highCompressor();

String meMessageString      = "My Message that I am sending to kafka";
byte[] uncompressedBytes    = jsonRequest.getBytes();
long lz4compressedLength    = COMPRESSOR.compress(uncompressedBytes).length;
 类似资料:
  • 我对Kafka2.6.0中的消息大小配置有点困惑。但让我们讲一个故事: 我们正在使用由3个节点组成的Kafka集群。到目前为止,消息的标准配置。“zstd压缩”被激活。 相关的代理配置很简单: 此时,生产者配置也很简单: 现在我们想把一个8Mbyte的消息放到一个特定的主题中。这些数据的压缩大小只有200 KB。 如果我将这些数据放入主题中,会出现以下错误: 所以我改变了生产者配置如下: 现在制作

  • 当我向Kafka主题发送消息时,我可能会收到一条比其他消息大得多的消息。 因此需要在单消息级进行压缩。根据https://cwiki.apache.org/confluence/display/kafka/compression, 一组消息可以被压缩并表示为一个压缩消息。 同样,根据https://github.com/apache/kafka/blob/0.10.1/clients/src/ma

  • 使用Kafka Streams,我们无法确定在处理写入接收器主题的消息后压缩这些消息所需的配置。 另一方面,使用经典的Kafka Producer,可以通过在KafkaProducer属性上设置配置“compression.type”轻松实现压缩 然而,似乎没有任何记录在案的Kafka Streams压缩处理过的消息的例子。 至于这次(2019年初),有没有一种方法可以使用Kafka流进行压缩?

  • 问题内容: 我有一个我从另一个构建的。我想知道gzip数据的原始(未压缩)长度。尽管我可以读到的末尾,然后算数,但这将花费大量时间并浪费CPU。在阅读之前,我想知道尺寸。 有没有像一个类似的方法为: 从以下版本开始: API Level 1 获取此ZipEntry的未压缩大小。 问题答案: GZIPInputStream是否有类似ZipEntry.getSize()的类似方法 不。它不在Javad

  • 问题内容: JSON.stringify显然不是非常节省空间。例如,当[123456789,123456789]可能需要大约5个字节时,它将占用20+字节。websocket是否在发送到流之前压缩其JSON? 问题答案: 从本质上讲,WebSocket只是用于TEXT或BINARY数据的一组框架。 它本身不执行压缩。 但是,WebSocket规范允许扩展,并且野外有各种各样的压缩扩展(其中一项的正

  • 大家好。我有一个Kafka项目,使用SpringKafka来听一个明确的主题。我需要一天听一次所有的信息,把它们放到一个集合中,然后在那里找到特定的信息。我无法理解如何用一个@KafkaListener方法读取所有消息。我的班级是: 我的事件集合的大小始终为1;我尝试使用不同的循环,但后来,我的收藏被归档了530000次。 更新:我已经找到了一种方法来做它与factory.setBatchList