当前位置: 首页 > 知识库问答 >
问题:

ActiveMQ:单个生产者,多个消费者

越风史
2023-03-14

我有一个使用ActiveMQ的消息队列。web请求用persistency=true将消息放入队列。现在,我有两个消费者,它们都作为单独的会话连接到这个队列。使用者1总是确认消息,但使用者2从不这样做。

JMS队列实现负载平衡器语义。一条消息将被一个使用者接收。如果在发送消息时没有可用的使用者,它将被保留,直到有可以处理消息的使用者可用为止。如果使用者接收到一条消息,但在关闭之前没有确认它,那么该消息将被重新传递给另一个使用者。一个队列可以有许多使用者,在可用的使用者之间消息负载平衡。

我从这里了解到的是,我希望所有消息最终都由消费者1处理,因为它总是确认。由于使用者2不承认,因此消息应该发送到使用者1。

但我注意到以下几点:1。当我提交一个请求时,我只看到每第二个请求来到消费者1。另一个请求不显示,它存储在ActiveMQ中。我想它去了消费者2谁不承认。那么接下来应该是消费者1吗?

我接收/处理消息的代码如下所示:

# --------------------------------------------- MODULE IMPORT ---------------------------------------------------------#
import argparse
import json
import logging
import multiprocessing as mp
import sys

import stomp
from tvpv_portal.services.msgbkr import MsgBkr
from utils import util


# --------------------------------------------- DEVELOPMENT CODE ------------------------------------------------------#
log = logging.getLogger(__name__)


class MessageProcessingListener(stomp.ConnectionListener):
    """This class is responsible for processing (consuming) the messages from ActiveMQ."""

    def __init__(self, conn, cb):
        """Initialization.

        Args:
            conn -- Connection object
            cb   -- Callback function
        """

        self._conn = conn
        self._cb = cb

    def on_error(self, headers, body):
        """When we get an error.

        Args:
            headers -- Message header
            body    -- Message body
        """

        log.error('Received error=%s', body)

    def on_message(self, headers, body):
        """When we receive a message.

        Args:
            headers -- Message header
            body    -- Message body
        """

        log.info('Received message')

        # Deserialize the message.
        item = json.loads(body)

        import pprint
        pprint.pprint(item)

        # TODO: check if msg is to be handled by this SITE. If so, acknowledge and queue it. Otherwise, ignore.

        # Put message into queue via callback (queue.put) function.
        #self._cb(item)

        # TODO: we only send acknowledge if we are supposed to process this message.
        # Send acknowledgement to ActiveMQ indicating message is consumed.
        self._conn.ack(headers['message-id'], headers['subscription'])


def worker(q):
    """Worker to retrieve item from queue and process it.

    Args:
        q -- Queue
    """

    # Run in an infinite loop. Get an item from the queue to process it. We MUST call q.task_done() to indicate
    # that item is processed to prevent deadlock.
    while True:
        try:
            item = q.get()

            # TODO: We will call external script from here to run on Netbatch in the future.
            log.info('Processed message')

        finally:
            q.task_done()


def flash_mq_rst_handler_main():
    """Main entry to the request handler."""

    # Define arguments.
    parser = argparse.ArgumentParser(description='Flash message queue request handler script',
                                     formatter_class=argparse.ArgumentDefaultsHelpFormatter,
                                     add_help=False)

    opts = parser.add_argument_group('Options')
    opts.add_argument('-h', '--help', action='help',
                      help='Show this help message and exit')
    opts.add_argument('--workers', metavar='val', type=int, default=4,
                      help='Number of worker processes')
    opts.add_argument('--log', metavar='file', type=util.get_resolved_abspath, default='flash_mq_rst_handler.log',
                      help='Log file')

    # Parse arguments.
    args = parser.parse_args()

    # Setup logger.
    util.configure_logger(args.log)
    log.info('Command line %s', ' '.join(map(str, sys.argv)))

    # Create a managed queue to store messages retrieved from message queue.
    queue = mp.Manager().JoinableQueue()

    # Instantiate consumer message broker + ensure connection.
    consumer = MsgBkr(producer=False)
    if not consumer.is_connected():
        log.critical('Unable to connect to message queue; please debug')
        sys.exit(1)

    # Register listener with consumer + queue.put as the callback function to trigger when a message is received.
    consumer.set_listener('message_processing_listener', MessageProcessingListener, cb=queue.put)

    # Run in an infinite loop to wait form messages.
    try:
        log.info('Create pool with worker=%d to process messages', args.workers)
        with mp.Pool(processes=args.workers) as pool:
            p = pool.apply_async(worker, (queue,))
            p.get()
    except KeyboardInterrupt:
        pass

    # See MsgBkr. It will close the connection during exit() so we don't have to do it.
    sys.exit(0)


if __name__ == '__main__':
    flash_mq_rst_handler_main()

共有1个答案

荀豪
2023-03-14

这个问题可以通过JMS桥实现:https://activemq.apache.org/components/artemis/documentation/1.1.0/jms-bridge.html

能够让它配置创建n+1个队列。源(传入)队列是放置所有消息的地方。根据消息中的某个选择器(比如头中的'some_key':'some_value'),消息可以路由到N个目的地(传出)队列中的一个。然后,每个站点可以侦听特定队列中的消息。同一目标队列中的多个使用者将以循环方式获取消息。

 类似资料:
  • 我有三根线。线程1(T1)是生成器,它生成数据。线程2和线程3(T2和T3)分别等待T1的数据在单独的循环中处理。我正在考虑在线程之间共享BlockingQueue,并通过调用“Take”让T2和T3等待。

  • 我有一个生产者/消费者场景,我不希望一个生产者交付产品,多个消费者消费这些产品。然而,常见的情况是,交付的产品只被一个消费者消费,而其他消费者从未看到过这个特定的产品。我不想实现的是,一个产品被每个消费者消费一次,而没有任何形式的阻碍。 我的第一个想法是使用多个BlockingQueue,每个消费者使用一个,并使生产者将每个产品按顺序放入所有可用的BlockingQueues中。但是,如果其中一个

  • 问题内容: 因此,我已经看到了许多在Go中实现一个消费者和许多生产者的方法-Go 并发中的经典fanIn函数。 我想要的是fanOut功能。它以一个通道作为参数,它从中读取一个值,并返回一个通道片,该通道将这个值的副本写入其中。 有没有正确/推荐的方法来实现这一目标? 问题答案: 您几乎描述了执行此操作的最佳方法,但这是执行此操作的一小段代码示例。 去游乐场:https : //play.gola

  • 我正在尝试了解RxJava并发的一些细节,但我不确定我的想法是否正确。我对SubscribeOn/观察的工作原理有很好的了解,但我正在尝试确定池调度程序的一些细节。为此,我正在考虑尽可能简单地实现一个1-N生产者-消费者链,其中消费者的数量与CPU一样多。 根据文档,Schedulers.computation()由与内核一样多的线程池支持。但是,根据Reactive合约,运算符只能获得顺序调用。

  • 下面我用一篇关于临时排队的文章来解释我的想法,我只想知道我对还是错。 参考链接:如何使用JMS实现请求响应 “创建临时目的地、消费者、生产者和连接都是与代理同步的请求-响应操作,因此在处理每个请求时应避免,因为它会导致与JMS代理进行大量聊天。” 我不明白这句话在咒骂什么?在不同的线程中我们可以访问临时队列吗?一点道理都没有?有人能解释一下吗

  • 这里的一些配置:非持久消费者、非持久消息、禁用的流控制、默认预取大小、优化确认=true、异步发送=true、使用jms连接ActiveMQ 例如 一个生产者、一个消费者, 生产者发送速率可以达到6k/s 但是,在这种情况下:一个生产者三个消费者, 生产者发送速率下降到4k/s 这是我的一些关键代码: 发件人类别: sendmain方法: 接收机类代码: 接收器类代码在这里隐藏了一些方法,例如创建