Kafka consumer有一个配置< code>max.poll.records,它控制对poll()的单次调用中返回的最大记录数,其默认值为500。我将它设置为一个很高的数字,这样我就可以在一次轮询中获得所有的消息。然而,即使这个主题有更多的信息,在一次呼叫中,民意调查只返回几千条信息(大约6000条)。< br>
如何进一步增加单个消费者阅读的邮件数量?
最有可能的是,您的有效负载受到最大.分区.fetch.bytes
的限制,默认情况下为1MB。请参阅Kafka使用者配置。
下面是详细的解释:
最大分区提取字节数
该属性控制服务器将为每个分区返回的最大字节数。默认值为1 MB,这意味着当KafkaConsumer.poll()返回ConsumerRecords时,record对象将在分配给使用者的每个分区中最多使用max.partition.fetch.bytes。因此,如果一个主题有20个分区,并且您有5个消费者,那么每个消费者将需要4 MB的内存来存储消费者记录。实际上,您会希望分配更多的内存,因为如果组中的其他使用者失败,每个使用者都需要处理更多的分区。最大值partition.fetch.bytes必须大于代理将接受的最大消息(由代理配置中的max.message.size属性确定),否则代理可能会有使用者无法使用的消息,在这种情况下,使用者将在尝试读取这些消息时挂起。设置max.partition.fetch.bytes时的另一个重要考虑因素是使用者处理数据所花费的时间。您还记得,消费者必须足够频繁地调用poll(),以避免会话超时和随后的重新平衡。如果单个poll()返回的数据量非常大,消费者可能需要更长的时间来处理,这意味着它将无法及时进行轮询循环的下一次迭代以避免会话超时。如果出现这种情况,有两种选择,要么降低最大值。partition.fetch.bytes或增加会话超时。
希望它有帮助!
您可以通过增加max.partition.fetch来增加Consumerpoll()
批大小。字节,但根据文档,它仍然有fetch.max.bytesmessage.max.bytes
,用于限制批大小。因此,一种方法是根据所需的批量大小增加所有这些属性
在消费者配置
max.partition.fetch中。字节默认值为1048576
服务器将返回的每个分区的最大数据量。记录由消费者分批获取。如果获取的第一个非空分区中的第一个记录批次大于此限制,则仍将返回该批次,以确保消费者能够取得进展。代理接受的最大记录批次大小是通过message.max.bytes(代理配置)或max.message.bytes(主题配置)定义的。有关限制消费者请求大小的信息,请参见fetch.max.bytes
在使用者配置
提取中.max.bytes 默认值为52428800
服务器应为读取请求返回的最大数据量。记录由使用者分批提取,如果获取的第一个非空分区中的第一个记录批处理大于此值,则仍会返回记录批处理,以确保使用者可以取得进展。因此,这不是绝对最大值。代理接受的最大记录批大小是通过消息.max字节(代理配置)或最大消息字节(主题配置)定义的。请注意,使用者并行执行多个提取。
在< code >代理配置 message.max.bytes中,默认值为< code>1000012
Kafka允许的最大记录批大小。如果增加此值,并且存在早于 0.10.2 的使用者,则使用者的提取大小也必须增加,以便他们可以提取如此大的记录批次。
在最新的消息格式版本中,为了提高效率,记录始终分组为批次。在以前的邮件格式版本中,未压缩的记录不会分组到批处理中,在这种情况下,此限制仅适用于单个记录。
这可以通过主题级别max.message.bytes配置为每个主题设置。
在主题配置
中,最大消息字节默认值为1000012
Kafka允许的最大记录批大小。如果增加此值,并且存在早于 0.10.2 的使用者,则使用者的提取大小也必须增加,以便他们可以提取如此大的记录批次。
在最新的消息格式版本中,为了提高效率,记录始终分组为批次。在以前的邮件格式版本中,未压缩的记录不会分组到批处理中,在这种情况下,此限制仅适用于单个记录。
我在使用者组中轮询来自 Kafka 的消息时遇到问题。我的使用者对象分配给给定的分区 之后,消费者向该分区分配: 之后,我可以计算分区内的消息 和 ..... 在我的主题中有超过30000条消息。问题是我只收到一条消息。 具有< code > max _ poll _ records = 200 < code > AUTO _ OFFSET _ RESET 的消费者配置是最早的 这是我的函数,我正
我看到一个问题,我的主题中的所有消息都被我的消费者重新阅读。我只有1个消费者,我在开发/测试时打开/关闭它。我注意到,有时在几天没有运行消费者之后,当我再次打开它时,它会突然重新阅读我的所有消息。 客户端 ID 和组 ID 始终保持不变。我显式调用提交同步,因为我的启用.我确实设置了 auto.offset.reset=最早,但据我所知,只有在服务器上删除了偏移量时,才应该启动。我正在使用 IBM
我使用以下代码来读取主题的数据,即“sha-test2”,但它正在读取完全替代的代码行,即 10 行中的 20 行。但是当我运行控制台时,它显示所有 20 行。即.bin/kafka-console-consumer.sh --zookeeper 本地主机:2181 --主题 sha-test2 --从头 我做错了什么?非常感谢您的帮助。
假设我的使用者从一个代理轮询,该代理有多个主题,每个主题有多个分区。我在同一个消费群体中总共有5个消费者。如果我的每个消费者都进行投票,将返回的数据顺序是什么? topicD-分区5 我的问题是,在这个单一的1轮询中,在按顺序移动到下一个主题/分区之前,我会收到来自该主题/分区的所有可用消息吗?意思例如: 在一次投票循环中,我收到了这个... 或者在那个单一的1轮询循环中,有可能接收到这个消息顺序
我有一个简单的java制作人,如下所示 我正在尝试读取如下数据 但消费者并没有从Kafka那里读到任何信息。如果我在处添加以下内容 然后消费者开始从题目开始阅读。但是每次消费者重新启动时,它都从我不想要的主题开始读取消息。如果我在启动消费程序时添加了以下配置 然后,它从主题中读取消息,但是如果消费者在处理所有消息之前重新启动,那么它不会读取未处理的消息。 有人可以让我知道出了什么问题,我该如何解决
我有一个Kafka主题(1.0.0),只有一个分区。消费者被封装在EAR中,当部署到Wildfly 10时,最后一条消息的轮询始终返回0条消息。虽然主题不是空的。 当我做民意测验时,我得到0条记录。尽管日志记录显示: 当我更改为-2时,如: 我确实收到一条消息: 当然,这不是正确的记录,消息377408在哪里? 尝试了许多方法来寻求结束等,但它从来没有工作。 这是我的消费者配置: 注意:我尝试了r