我正在节点上使用stomp客户端。js与ActiveMQ。当前,生产者将消息推送到队列中,如果客户端(消费者)已连接,则它会使用消息。若客户端未连接,则消息在队列中挂起,直到或除非连接了某个消费者。
在我的例子中,消费者执行的每个操作都很昂贵,最多需要2个小时。我想消费一条消息,然后停止消费队列中的消息,直到该操作完成。目前,一旦消息被推入队列,客户端就会自动从队列中提取所有消息。我想要的是提取一条消息,等待其完成,然后再次从队列中提取,以此类推。这是为了避免比赛条件。
例如,如果我在队列中有三个任务:a | B | C
当前的流程是,所有消息都在队列中被消耗,使其为空。
我想要的是消费1条消息,即
A | B |(消耗C)
对C执行操作。
消费者完成所有操作后,请记录下一条消息
A |(B消耗)
等等
我使用ActiveMQ进行通信的原因是,如果节点应用程序崩溃,或者我的服务器关闭等,它可以可靠地传递消息。
使用基于ActiveMQ 5的解决方案,客户端可以配置预取大小,然后代理将用可用消息填充预取大小。如果您使用activemq。在STOMP subscribe上设置prefetchSize头,并将该值设置为1。假定您使用了客户端或客户端个人确认模式,则代理将一次发送一条消息。在您的情况下,您可以使用客户端确认,然后在处理完消息后发送ACK,然后代理将发送另一条消息(如果有)。
我的应用程序使用来自RabbitMQ的一些消息并对其进行处理。我有大约10个队列,每个队列最多有10个消费者(线程)。我有5次预回迁。我正在Heroku中使用CloudAMQP插件(RabbitMQ作为服务)运行安装程序。 我使用默认心跳和连接超时设置(60秒)运行。 我的java应用程序是一个使用sping-Rabbit库的Spring Boot应用程序。 版本: 问题是对于一个特定队列的消费者
我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认
现在,我有一个Spring Boot CLI应用程序,当应用程序启动时,它会自动启动Kafka消费者。我的任务是更新提供API的应用程序,允许在特定条件下启动或停止Kafka消费者。所以,我将使用SpringBootStarterWeb创建该API。但我找不到一种方法来手动管理消费过程。我需要的是 在不使用消费者的情况下启动API 关于如何手动管理消费过程的任何建议? 技术细节: 用于创建侦听器
我想这个话题发生了什么...偏移坏了还是我不知道... 有人知道会发生什么吗?谢谢
我们计划将主动 MQ (STOMP) 用于我们的一个项目。其中一个要求是,如果我们发现用户不合适,就将其踢出/禁止。如何通过单板技术实现这一点?有点像在 IRC 中踢球的东西。
例如,分区有1-10的偏移量。我只想从3-8消费。在消耗了第8条消息后,程序应该退出。