当前位置: 首页 > 知识库问答 >
问题:

Kafka没有向消费者传达足够的信息

钮承恩
2023-03-14

我有一个kafka主题,3个分区,只有一个带批处理的消费者。我在消费者方面使用的是spring kafka和以下消费者道具:

max.poll.records=10000
fetch.min.bytes=2000000
fetch.max.bytes=15000000
fetch.max.wait.ms=1000
max.poll.interval.ms=300000
auto.offset.reset.config=earliest
idle.event.interval=120000

即使队列中有数千条消息(GBs数据)在等待,kafka consumer在每次轮询中也会收到大约10条消息(总大小约为1MB)。使用者应该获取fetch.max.bytes(在我的示例中为15MB)或max.poll.records(在我的示例中为10000)的批处理。有什么问题?

共有1个答案

曹沛
2023-03-14

有几种情况可能导致这种情况,请执行以下更改:

  1. 增加fetch.min.bytes-使用者还可以获取fetch.min.bytes的批处理,即1.9MB。
  2. 增加fetch.max.wait.ms-轮询函数等待fetch.min.bytesfetch.max.wait.ms触发,无论先来的是什么。
    fetch.max.wait.ms在您的配置中是1秒,听起来不错,但增加fetch.max.wait.ms以防出现问题。
  3. 增加max.partition.fetch.bytes-默认值为1MB,它可以减小像您这样的小分区主题的轮询大小(对于单个使用者的3个分区主题,最多只能进行3MB轮询)。

请尝试使用以下值:

fetch.min.bytes=12000000
fetch.max.wait.ms=5000  
max.partition.fetch.bytes=5000000  

更深层次的解释:
https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html

 类似资料:
  • 我想从Kafka的主题消费事件后,他们到达的时间。我希望使用事件的时间在消息的有效负载中。在Kafka那里有可能实现那样的事情吗?它的缺点是什么? 实际示例:一条消息M在12:10产生,在12:11到达我的Kafka主题,我希望消费者在12:41(到达后30分钟)轮询它

  • 有什么方法可以阻止Kafka的消费者在一段时间内消费信息吗?我希望消费者停止一段时间,然后开始消费最后一条未消费的消息。

  • 消费者使用Spring的JavaConfig类如下: Kafka主题侦听器使用@KafkaListener注释,如下所示: 我的pom包括依赖项: 现在当我打包到war并部署到tomcat时,它不会显示任何错误,即使在调试模式下也不会显示任何错误,只是部署war什么都没有。 请帮助我了解是否缺少触发kafkalistner的某些配置。 谢谢Gary我添加了上下文。xml和web。xml,但我得到了

  • 由于它是一个Spring Boot应用程序,默认偏移量设置为Latest。我在这里做错了什么,请帮我弄明白。

  • 我相信这三种类型的确认由于生产者属性仅限于领导者和生产者,我希望生产者在消费者通过kafka broker消费来自存储/队列的消息时收到具体的消息。还请纠正我,如果我在制作人的“acks”属性上有错误,它的默认值是“-1”,它确认所有副本是否已接收/存储消息,但它是否与消费者有关,或者我们是否可以在消费者提交且Kafka向制作人发送确认时创建一个桥梁?

  • 我在mac上运行Kafka和Flink作为docker容器。 我已经实现了Flink作业,它应该消耗来自Kafka主题的消息。我运行一个向主题发送消息的python生产者。 工作开始时没有问题,但没有收到任何消息。我相信这些消息被发送到了正确的主题,因为我有一个能够使用消息的python消费者。 flink作业(java): Flink作业日志: 生产者作业(python):(在主机上运行-不是d