当前位置: 首页 > 知识库问答 >
问题:

Spring Amqp消费者在运行一段时间后暂停

庄浩言
2023-03-14

我们有一个具有Ha all策略的2节点RabbitMQ集群。我们在应用程序中使用Spring AMQP与RabbitMQ对话。生产者部分工作正常,但消费者工作了一段时间并暂停。生产者和消费者作为不同的应用程序运行。更多关于消费者部分的信息。

  • 我们将SimpleMessageListenerContainerChannelAwareMessageListener一起使用,使用手动ack模式和默认预取(1)
  • 在我们的应用程序中,我们创建队列(按需)并将其添加到侦听器中
  • 当我们从10个ConcurrentConsumers和20个MaxConcurrentConsumers开始时,消费大约持续15个小时并暂停。当我们将MaxConcurrentConsumers增加到75时,这种情况会在1小时内发生

在RabbitMQ UI上,当出现这种情况时,我们会在通道选项卡上看到带有3/4 unacked消息的通道,在此之前,只有1条unacked消息。

我们的线程转储类似于此。但是将心跳设置为60无助于改善这种情况。

大多数线程转储都有以下消息。如果需要,我将附加整个线程转储。如果我缺少任何可能导致消费者暂停的设置,请告知我?

"pool-6-thread-16" #86 prio=5 os_prio=0 tid=0x00007f4db09cb000 nid=0x3b33 waiting on condition [0x00007f4ebebec000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000007b9930b68> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$InternalConsumer.handleDelivery(BlockingQueueConsumer.java:660)
    at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:144)
    at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:99)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

更多信息我们动态地向SimpleMessageListenerContainer添加和删除队列,我们怀疑这会导致一些问题,因为每次我们从侦听器添加或删除队列时,所有BlockingQueueConsumer都会被删除并再次创建。你认为这会导致这个问题吗?

共有2个答案

衡子安
2023-03-14

AMQP-621现在合并为master;我们将发布1.6.1。在接下来的几天发布。

元修然
2023-03-14

您的问题在目标侦听器的下游某处。

看,预取(1)导致:

this.queue = new LinkedBlockingQueue<Delivery>(prefetchCount);

如果我们不调查队列,我们这里有什么?

BlockingQueueConsumer.this.queue.put(new Delivery(consumerTag, envelope, properties, body));

对,锁着停车。

 类似资料:
  • 我正在使用这个库来实现节点kafka与消费者暂停和恢复方法来处理背压。我已经创建了一个小演示,我可以在其中和,但问题是在后它停止了消费消息。 这是我的代码。 任何人都可以帮助我,我在恢复消费者时做错了什么?当我启动使用者时,它只接收一条消息,并且在恢复后仍然不消耗任何其他消息。

  • 在我的Spring启动应用程序中,我有kafka消费者类,每当主题中有可用消息时,它会频繁读取消息。我想限制消费者每隔2小时消费一次消息。就像阅读完一条消息后,消费者将暂停2小时,然后再消费另一条消息。这是我的消费者配置方法:- 然后我创建了这个容器方法,在其中我设置了kafka配置的其余部分 使用此代码分区每2小时重新平衡一次,但它根本没有读取消息。我的kafka消费者方法:-

  • 我正在使用带有KafkaListener注释的spring kafka v2.5.2。 在运行时,我希望能够向消费者发送停止消费的信号。 我看到了autoStartup参数,但它似乎只对初始化有效,之后无法更改。 我看到了KafkaListenerEndpointRegistry的methode close()。。。 你有什么建议吗? 提前谢谢。

  • 我看到,尽管kafka主题有大量(数百万)消息排队,vert. x消费者只获取500条消息(默认获取量),然后将其传递给处理程序。但是在消息被处理和提交后,消费者只是停下来等待大约35秒,直到它获取另一批消息。 我希望消费者会继续获取,直到它设法赶上分区,然后暂停。我如何让它这样做? 使用以下代码设置消费者: 为消费者提供以下配置: 我使用的是vert.x 3.9.2,Kafka是2.4.1

  • 我有一个使用Spring kafka库的Spring启动应用程序的消费者。我想为每个消费者线程设置租户上下文,即在创建每个线程时调用一个方法(创建时每个线程只调用一次)。目前,我已将其添加到listner中,其中对方法有@KafkaListner注释,但它每次轮询并处理每条记录时都会调用它。我想在消费者线程启动时调用此方法一次。如果我们有任何这样的事情,请在这里帮助我。

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?