当前位置: 首页 > 知识库问答 >
问题:

激活压缩的Kafka消息大小

闻人修明
2023-03-14

我对Kafka2.6.0中的消息大小配置有点困惑。但让我们讲一个故事:

我们正在使用由3个节点组成的Kafka集群。到目前为止,消息的标准配置。“zstd压缩”被激活。

相关的代理配置很简单:

compression.type=zstd

此时,生产者配置也很简单:

compression.type=zstd

现在我们想把一个8Mbyte的消息放到一个特定的主题中。这些数据的压缩大小只有200 KB。

如果我将这些数据放入主题中,会出现以下错误:

sudo /opt/kafka/bin/kafka-console-producer.sh --topic XXX --producer.config /opt/kafka/config/admin-ssl.properties --broker-list broker < kafka/new\ 2.txt

[2020-11-05 13:43:34,500] ERROR Error when sending message to topic XXX with key: null, value: 8722456 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.RecordTooLargeException: The message is 8722544 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.

所以我改变了生产者配置如下:

compression.type=zstd
max.request.size=10485760

现在制作人接受更大的消息。但它仍然不起作用:

sudo /opt/kafka/bin/kafka-console-producer.sh --topic XXX --producer.config /opt/kafka/config/admin-ssl.properties --broker-list broker < kafka/new\ 2.txt

[2020-11-05 15:10:01,513] ERROR Error when sending message to topic Komsa.Kafka.Test with key: null, value: 8722544 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
    org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

这是另一条错误消息。我不明白为什么会这样。

我认为此消息与“message.max.bytes”属性有关。但我不明白为什么。这是该物业的文件:

Kafka允许的最大记录批次大小(如果启用压缩,则压缩后)。如果增加此大小,并且有超过0.10.2的消费者,则消费者的读取大小也必须增加,以便他们可以读取这么大的记录批次。在最新的消息格式版本中,为了提高效率,记录总是被分组为批次。在以前的消息格式版本中,未压缩的记录不会分组为批次,在这种情况下,此限制仅适用于单个记录。这可以通过主题级别max.message.bytes配置按主题设置。

我认为这意味着这个参数与压缩消息的大小有关,压缩消息的大小约为kbyte。

有人能帮我吗?

共有2个答案

闾丘成双
2023-03-14

我们的经验是,如果您在代理级别设置压缩类型,就像在

compression.type=zstd

代理将解压缩来自生产者的任何东西,并使用该压缩类型再次压缩数据。即使生产者已经使用zstd,也会有解压缩和“重新压缩”发生。

因此,您需要在代理级别将compression.type设置为生产者

周马鲁
2023-03-14

我找到了解决办法:

问题是Kafka游戏机制作人。sh忽略了压缩。输入producer config。如果我打电话

sudo /opt/kafka/bin/kafka-console-producer.sh --topic XXX --producer.config /opt/kafka/config/admin-ssl.properties --compression-codec=zstd --broker-list broker < kafka/new\ 2.txt

压缩。codec=zstd之所以有效,是因为制作人压缩了消息。

 类似资料:
  • 我想知道Kafka中信息的压缩大小。 我使用kafka 1.1.0和java kafka connect 1.1.0从我的制作人向主题发送消息。 如果消息对我的制作人来说太大,我会得到一个 消息序列化时为xxx字节,大于使用max.request配置的最大请求大小。大小配置。 设置最大请求。将大小设置为合适的值将导致来自代理的错误消息作为消息。代理配置中的max.bytes也必须相应地进行调整。不

  • 当我向Kafka主题发送消息时,我可能会收到一条比其他消息大得多的消息。 因此需要在单消息级进行压缩。根据https://cwiki.apache.org/confluence/display/kafka/compression, 一组消息可以被压缩并表示为一个压缩消息。 同样,根据https://github.com/apache/kafka/blob/0.10.1/clients/src/ma

  • 使用Kafka Streams,我们无法确定在处理写入接收器主题的消息后压缩这些消息所需的配置。 另一方面,使用经典的Kafka Producer,可以通过在KafkaProducer属性上设置配置“compression.type”轻松实现压缩 然而,似乎没有任何记录在案的Kafka Streams压缩处理过的消息的例子。 至于这次(2019年初),有没有一种方法可以使用Kafka流进行压缩?

  • 问题内容: JSON.stringify显然不是非常节省空间。例如,当[123456789,123456789]可能需要大约5个字节时,它将占用20+字节。websocket是否在发送到流之前压缩其JSON? 问题答案: 从本质上讲,WebSocket只是用于TEXT或BINARY数据的一组框架。 它本身不执行压缩。 但是,WebSocket规范允许扩展,并且野外有各种各样的压缩扩展(其中一项的正

  • 如何使用Spring Cloud Stream Kafka Binder为生产者启用压缩(例如GZIP)?

  • 我有一个带有Kafka使用者的spring应用程序,它使用@KafKalisterner注释。正在使用的主题是日志压缩的,我们可能会遇到必须再次使用主题消息的情况。以编程方式实现这一目标的最佳方法是什么?我们不控制Kafka主题配置。