当前位置: 首页 > 知识库问答 >
问题:

Kafka在大消息排队后“停止工作”

姚自强
2023-03-14

我正在运行kafka2.11-0.9.0.0和一个基于Java的生产者/消费者。与消息~70 KB一切工作良好。但是,在生产者将一个更大的70 MB消息排入队列之后,kafka似乎停止将消息传递给消费者。即。不仅大的消息没有传递,后续的小消息也没有传递。我知道制作人成功了,因为我使用了kafka回调进行确认,我可以在kafka消息日志中看到消息。

kafka配置自定义更改:

message.max.bytes=200000000
replica.fetch.max.bytes=200000000

使用者配置:

props.put("fetch.message.max.bytes",   "200000000");
props.put("max.partition.fetch.bytes", "200000000");

共有1个答案

欧阳俊逸
2023-03-14

您需要增加消费者可以使用的消息的大小,这样它就不会在试图阅读一个很大的消息时陷入困境。

max.partition.fetch.bytes (default value is 1048576 bytes)

服务器每分区将返回的最大数据量。用于请求的最大总内存将是#partitions*max.partition.fetch.bytes。该大小必须至少与服务器允许的最大消息大小一样大,否则生产者发送的消息可能大于使用者能够获取的消息。如果发生这种情况,使用者可能会在试图获取某个分区上的大型消息时陷入困境。

 类似资料:
  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认

  • 我的示例接受google pub/sub,记录它并将ack发送回 配置: NotificationHandler: 线程转储: 行: 但ThreadExecutor未执行deliverMessageTask。 在我看来,它看起来像是库中的一个bug,但可能是库误用。无论如何,我正在寻找任何解决方案/变通办法来避免这种情况。 我使用: spring-boot springCloudVersion=

  • 我注意到我的Kafka Streams应用程序在一段时间没有读取来自Kafka主题的新消息时停止工作。这是我第三次看到这种情况发生。 自5天以来,没有向主题发送任何消息。我的Kafka Streams应用程序也托管了一个spark java Web服务器,它仍然具有响应能力。然而,Kafka Streams不再阅读我向Kafka主题发出的消息。当我重新启动应用程序时,所有消息都将从代理获取。 如何

  • 我想这个话题发生了什么...偏移坏了还是我不知道... 有人知道会发生什么吗?谢谢

  • 我有一个使用Spring和RabbitMQ的项目设置。目前,我的应用程序可能会收到一条amqp消息,在另一个异步进程完成之前无法处理该消息(遗留和完全分离,我无法控制)。因此,结果是我可能不得不等待处理消息一段时间。其结果是变压器出现异常。 当消息被NACK回rabbitMQ时,它会将其放回队列的头部,并立即重新拉入队列。如果我收到的无法处理的消息等于并发侦听器的数量,我的工作流就会被锁定。它转动

  • 我正在使用这个库来实现节点kafka与消费者暂停和恢复方法来处理背压。我已经创建了一个小演示,我可以在其中和,但问题是在后它停止了消费消息。 这是我的代码。 任何人都可以帮助我,我在恢复消费者时做错了什么?当我启动使用者时,它只接收一条消息,并且在恢复后仍然不消耗任何其他消息。