当前位置: 首页 > 知识库问答 >
问题:

了解Kafka消息字节大小

丌官承
2023-03-14

如何获得Kafka单张唱片的大小?

有一些关于我为什么需要这个的说明。

这似乎不是在ConsumerRecord或RecordMetadata类上公开的serializedValueSize。我并不真正理解这个属性的价值,因为它与对消费者有用的消息的大小不匹配。serializedValueSize用于什么?

最大Poll.Records

在对poll()的单个调用中返回的最大记录数。

这并不存在,但却是我想要的行为:

在消费者大小上,我将“max.partition.fetch.bytes”设置为291字节。这使得消费者只能返回1条消息。将该值设置为292有时会使使用者返回2条消息。所以我已经计算出消息大小为292的一半;一条消息的大小为146字节。

上面的项目符号要求对Kafka配置进行更改,并涉及手动查看/greping一些服务器日志。如果Kafka Java API提供这个值就太好了。

在生产者方面,Kafka提供了一种获取RecordMetadata.SerializedValueSize方法中记录的序列化大小的方法。这个值是76字节,与上面测试中给出的146字节有很大不同。

java prettyprint-override">System.out.println(myRecordMetadata.serializedValueSize());
// 76
# producer
batch.size=1

# consumer

# Expected this to work:
# 76 * 2 = 152
max.partition.fetch.bytes=152

# Actually works:
# 292 = ??? magic ???
max.partition.fetch.bytes=292

我希望将max.partition.fetch.bytes设置为serializedValueSize给出的字节数的倍数可以使Kafka使用者从轮询接收到最多该数量的记录。相反,max.partition.fetch.bytes值需要高得多才能实现这一点。

共有1个答案

赵英资
2023-03-14

我不太熟悉serializedvalueSize方法,但根据文档,这只是存储在该消息中的值的大小。这将小于总消息大小(即使使用null键),因为消息还包含不属于值的元数据(如时间戳)。

至于您的问题:与其通过处理消息大小和限制使用者的吞吐量来直接控制轮询,为什么不只是缓冲传入的消息,直到足够的消息可用或所需的超时(您提到了fetch.max.wait.ms,但您可以手动指定一个超时)已经过去?

public static <K, V> List<ConsumerRecord<K, V>>
    minPoll(KafkaConsumer<K, V> consumer, Duration timeout, int minRecords) {
  List<ConsumerRecord<K, V>> acc = new ArrayList<>();
  long pollTimeout = Duration.ofMillis(timeout.toMillis()/10);
  long start = System.nanoTime();
  do {
    ConsumerRecords<K, V> records = consumer.poll(pollTimeout);
    for(ConsumerRecord<K, V> record : records)
      acc.add(record);
  } while(acc.size() < minRecords &&
          System.nanoTime() - start < timeout.toNanos());
  return acc;
}

consumer.poll的调用中的timeout.ToMillis()/10超时是任意的。您应该选择一个足够小的持续时间,这样即使我们等待的时间比指定的超时时间长(这里是:长10%)也没关系。

编辑:请注意,这可能返回一个大于max.poll.records的列表(最大值为max.poll.records+minrecords-1)。如果您还需要强制执行这个严格的上限,可以使用方法外部的另一个缓冲区临时存储多余的记录(这可能会更快,但不允许minPoll和普通的Poll方法混合使用),或者直接丢弃它们,并使用ConsumerSeek方法进行回溯。

因此,问题不在于控制poll-方法返回的消息数量,而在于如何获得单个记录的大小。不幸的是,我认为那不可能不经过很多麻烦。问题是,对此没有真正的(恒定的)答案,甚至一个大概的答案将取决于Kafka版本,或者更确切地说,不同的Kafka协议版本。

首先,我不完全确定max.partition.fetch.bytes到底控制了什么(如:协议开销是否也是它的一部分?)。让我来说明一下我的意思:当使用者发送一个fetch请求时,那么fetch响应由以下字段组成:

  1. 节流时间(4字节)
  2. 主题响应数组(4字节表示数组长度+数组中数据的大小)。

主题响应依次包括

  1. 主题名称(字符串长度+字符串大小为2个字节)
  2. 分区响应数组(4字节表示数组长度+数组中数据的大小)。
    null
  1. 正文大小(1-5字节)
  2. 属性(1字节)
  3. 时间戳增量(1-10字节)
  4. 偏移增量(1-5字节)
  5. 密钥字节数组(1-5字节+密钥数据大小)
  6. 值字节数组(1-5字节+值数据大小)
  7. 标头(1-5字节+标头数据大小)。

源代码在这里。正如您所看到的,您不能简单地将292字节除以2来获得记录大小,因为某些开销是恒定的,并且与记录的数量无关。

更糟糕的是,记录没有恒定的大小,即使它们的键和值(以及标头)有恒定的大小,因为时间戳和偏移量使用可变长度数据类型存储为与批处理时间戳和偏移量的差异。此外,这只是编写本文时最新协议版本的情况。对于旧版本,答案将再次不同,谁知道会发生什么在未来的版本。

 类似资料:
  • 我试图解析一条Kafka消息,它是以某种加密的AVRO格式。我有以下AvroSchema。avsc avro架构文件: 现在,我编写了以下代码来获取JSON格式的数据: 请帮我解密这封信。 加密字节消息属于以下类型:<代码>080-21-0001:�哦�@@��A.�ǐ�U:�哦�@@��A 我按照建议进行了更改,现在我有以下代码: 但我仍然得到错误为“不是数据文件”。

  • 我想知道Kafka中信息的压缩大小。 我使用kafka 1.1.0和java kafka connect 1.1.0从我的制作人向主题发送消息。 如果消息对我的制作人来说太大,我会得到一个 消息序列化时为xxx字节,大于使用max.request配置的最大请求大小。大小配置。 设置最大请求。将大小设置为合适的值将导致来自代理的错误消息作为消息。代理配置中的max.bytes也必须相应地进行调整。不

  • 问题内容: 因此,在昨天的工作中,我不得不编写一个应用程序来计算AFP文件中的页数。因此,我整理了我的MO:DCA规范PDF,找到了结构化字段及其3个字节的标识符。该应用程序需要在AIX机器上运行,所以我决定用Java编写它。 为了获得最大效率,我决定读取每个结构化字段的前6个字节,然后跳过该字段中的其余字节。这会让我: 因此,我检查字段类型,如果是,则增加页面计数器,如果不是,则不增加。然后,我

  • 我对Kafka2.6.0中的消息大小配置有点困惑。但让我们讲一个故事: 我们正在使用由3个节点组成的Kafka集群。到目前为止,消息的标准配置。“zstd压缩”被激活。 相关的代理配置很简单: 此时,生产者配置也很简单: 现在我们想把一个8Mbyte的消息放到一个特定的主题中。这些数据的压缩大小只有200 KB。 如果我将这些数据放入主题中,会出现以下错误: 所以我改变了生产者配置如下: 现在制作

  • 我目前使用的是Kafka0.9.0.1。根据我找到的一些来源,设置消息大小的方法是修改中的以下键值。 message.max.bytes replica.fetch.max.bytes fetch.message.max.bytes 我的文件实际上有这些设置。 其他可能相关的设置如下。 但是,当我试图发送具有4到6 MB大小的有效负载的消息时,使用者永远不会得到任何消息。生产者似乎在发送消息时没有

  • 问题内容: 我经常卡在没有源的Java类文件中,并且试图理解我手头的问题。 请注意,反编译器是有用的,但在所有情况下都不足够… 我有两个问题 有哪些工具可用来查看Java字节码(最好从linux命令行中获得) 什么是熟悉Java字节码语法的良好参考 问题答案: 与其直接查看Java字节码(需要熟悉Java虚拟机及其操作),不如尝试使用Java反编译实用程序。反编译器将尝试从指定文件创建源文件。 该