当前位置: 首页 > 知识库问答 >
问题:

大黄蜂消费者在关闭连接后需要很长时间才能停止

巫马欣嘉
2023-03-14

我有一个java程序200个并发消费者从一个独立的HornetQ服务器提供的队列中读取数据。侦听器只需从队列中选择一个项目,随机等待1.5-2.5秒,然后确认返回队列(它是CLIENT_acknowledge)。

现在,我在队列中创建了20.000条消息,启动这200个消费者,5秒钟后,我在连接上调用close(也尝试了stop)方法。到这个时候,消费者应该已经处理了大约1000条消息。但是,他们没有完成当前的工作,也没有从队列中接收更多的消息,而是又花了3分钟处理了大约10.000条消息,然后才最终停止并结束应用程序(connection.close()是一个阻塞调用)。

我怀疑这可能是由于客户端上的某种缓冲,我一直在寻找限制它的方法,并在出厂配置中将这四个属性设置为尽可能严格:

<producer-window-size>1</producer-window-size>
<consumer-window-size>0</consumer-window-size>
<consumer-max-rate>1</consumer-max-rate>
<producer-max-rate>1</producer-max-rate>

我知道我的案子不一定需要所有这些,只是想尝试一切。我知道这些正在注册,因为一旦将消费者窗口大小设置为零,我遇到的另一个问题,消费消息的顺序,已经解决了。

共有1个答案

梅跃
2023-03-14

我们已经更改了代码以中断上游的通信,如下所示:

黄蜂Q-1379

提交可能会让您知道发生了什么变化:

https://github.com/hornetq/hornetq/commit/4c05475

基本上,我们现在在关闭连接时强制关闭netty连接,这应该在您关闭连接的那一刻中断与任何站立消费者的通信。

我不认为这个提交是在2.2分支上完成的。

 类似资料:
  • 问题内容: 我正在使用Hibernate 4.2,JPA 2.0和Postgres 9.2 代码卡在 在进一步调查中,我发现Hibernate调用了class 方法。此方法尝试加载有关每个数据库对象的元数据 的代码是Postgers的JDBC驱动程序的一部分,而确实是花费时间来执行该方法的驱动程序(我加载了驱动程序源并尝试了跟踪)。但是由于这个问题在Hibernate 3.3(我之前使用过)中没有

  • 给出结果需要20多秒,而在mongo控制台中同样的查询需要不到一秒。 为什么会出现这种情况,如何减少速度差距?

  • 我们启动一个Kafka消费者,监听一个可能还没有创建的主题(不过,主题自动创建是启用的)。 此后不久,一位制作人发表了关于这个话题的消息。 Kafka原木

  • 我有一个Java ExecutorService(固定线程池1),我将可运行的任务传递给它以供将来执行。每个任务通常在10秒内完成。Executor服务只是简单地完成任务。如果我关闭应用程序,我会运行以下程序:; 问题是它似乎需要一段时间才能关闭,有时可能需要几分钟,有时它似乎永远不会关闭,有时它会在几秒钟内关闭!在执行器队列中的任何时候都可能有大约2000个任务,但我只是希望它完成当前正在执行的

  • 询问代码的问题必须证明对所解决问题的理解程度最低。包括尝试的解决方案、为什么不起作用以及预期的结果。另请参见:堆栈溢出问题检查表 我敢肯定,上面的程序不是无限循环的。我用进行了测试,得到了想要的结果 我不明白为什么我的CPU要花很长时间来运行它。 编辑:ProjectEuler的我的代码。网络问题3。

  • 我在项目中使用solace作为JMS提供者。我使用spring CachingConnectionFactory检索连接。在这个连接上,我创建了一个新会话。我在那个会话中创建了一个消费者的线程。 我正在做一些故障转移测试。当我将服务器从网络连接上拔下时,它会失败。当我再次连接服务器时,仍会收到相同的异常: 更重要的是,CachingConnectionFactory默认将reConnectOnEx

  • 我们正在运行一个3 broker Kafka 0.10.0.1集群。我们有一个java应用程序,它产生了许多消费线程,从不同的主题消费。对于每一个主题,我们都指定了不同的消费者群体。 很多时候,我看到每当这个应用程序重新启动时,一个或多个CG需要超过5分钟来接收分区分配。在此之前,这个话题的消费者不会消费任何东西。如果我去Kafka broker并运行Consumer-Groups.sh并描述特定

  • 我有一个Python进程(或者更确切地说,在一个使用者组中并行运行的一组进程),它根据来自某个主题的Kafka消息输入来处理数据。通常每条消息的处理都很快,但有时,取决于消息的内容,可能需要很长时间(几分钟)。在这种情况下,Kafka broker断开客户端与组的连接,并启动重新平衡。我可以将设置为一个非常大的值,但它可能会超过10分钟,这意味着如果客户机死亡,集群在10分钟内无法正确地重新平衡。