当前位置: 首页 > 面试题库 >

JMS-从一个消费者到多个消费者

闾丘炫明
2023-03-14
问题内容

我有一个JMS客户端,它正在生成消息并通过JMS队列发送到其唯一的使用者。

我想要的是不止一个消费者收到这些消息。我想到的第一件事是将队列转换为主题,以便现有用户和新用户都可以订阅并将相同的消息传递给他们。

显然,这将涉及在生产者和消费者方面修改当前的客户代码。

我还要查看其他选项,例如创建第二个队列,这样就不必修改现有的使用者。我相信这种方法有很多优点,例如(如果我错了,请纠正我)在两个不同队列之间而不是一个队列之间平衡负载,这可能会对性能产生积极影响。

我想就您可能会看到的这些选项和缺点/优点获得建议。任何反馈都非常感谢。


问题答案:

如您所说,您有一些选择。

如果将其转换为主题以达到相同的效果,则需要使使用者成为永久使用者。如果您的消费者还活着,那么队列提供的一件事就是持久性。这将取决于您使用的MQ系统。

如果要坚持使用队列,则将为每个使用者和将在原始队列上侦听的调度程序创建一个队列。

Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1
                                         -> Queue_Consumer_2 <- Consumer_2
                                         -> Queue_Consumer_3 <- Consumer_3

话题的优点

  • 动态添加新消费者更容易。所有消费者将无需任何工作即可收到新消息。
  • 您可以创建循环主题,以便Consumer_1会收到一条消息,然后是Consumer_2,然后是Consumer_3。
  • 可以向消费者推送新消息,而不必查询队列以使它们具有响应性。

主题的缺点

  • 除非您的代理支持此配置,否则消息不是持久的。如果使用者下线然后返回,则可能会丢失消息,除非设置了持久使用者。
  • 难以允许Consumer_1和Consumer_2接收消息,但不能接收Consumer_3。使用分派器和队列,分派器无法将消息放入Consumer_3的队列中。

队列的优点

  • 消息是持久的,直到使用者将其删除
  • 调度程序可以通过不将消息放入相应的使用者队列来过滤哪些使用者获得哪些消息。不过,这可以通过过滤器通过主题来完成。

队列的缺点

  • 需要创建其他队列以支持多个使用者。在动态环境中,这不会有效。

在开发消息系统时,我更喜欢主题,因为它给了我最大的权力,但是看到您已经在使用队列,就需要您更改系统实现主题的工作方式。

多用户队列系统的设计与实现

Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1
                                         -> Queue_Consumer_2 <- Consumer_2
                                         -> Queue_Consumer_3 <- Consumer_3

资源

请记住,您还需要处理其他事情,例如问题异常处理,重新连接以及在丢失连接时重新排队等。这只是为了让您了解如何完成我的工作描述。

在实际系统中,我可能不会在第一个例外退出。我将允许系统继续以最佳状态运行并记录错误。如此代码所示,如​​果将消息放入单个使用者队列失败,则整个调度程序将停止。

分派器

/*
 * To change this template, choose Tools | Templates
 * and open the template in the editor.
 */
package stackoverflow_4615895;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;

public class Dispatcher {

    private static long QUEUE_WAIT_TIME = 1000;
    private boolean mStop = false;
    private QueueConnectionFactory mFactory;
    private String mSourceQueueName;
    private String[] mConsumerQueueNames;

    /**
     * Create a dispatcher
     * @param factory
     *      The QueueConnectionFactory in which new connections, session, and consumers
     *      will be created. This is needed to ensure the connection is associated
     *      with the correct thread.
     * @param source
     *
     * @param consumerQueues
     */
    public Dispatcher(
        QueueConnectionFactory factory, 
        String sourceQueue, 
        String[] consumerQueues) {

        mFactory = factory;
        mSourceQueueName = sourceQueue;
        mConsumerQueueNames = consumerQueues;
    }

    public void start() {
        Thread thread = new Thread(new Runnable() {

            public void run() {
                Dispatcher.this.run();
            }
        });
        thread.setName("Queue Dispatcher");
        thread.start();
    }

    public void stop() {
        mStop = true;
    }

    private void run() {

        QueueConnection connection = null;
        MessageProducer producer = null;
        MessageConsumer consumer = null;
        QueueSession session = null;
        try {
            // Setup connection and queues for receiving the messages
            connection = mFactory.createQueueConnection();
            session = connection.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE);
            Queue sourceQueue = session.createQueue(mSourceQueueName);
            consumer = session.createConsumer(sourceQueue);

            // Create a null producer allowing us to send messages
            // to any queue.
            producer = session.createProducer(null);

            // Create the destination queues based on the consumer names we
            // were given.
            Queue[] destinationQueues = new Queue[mConsumerQueueNames.length];
            for (int index = 0; index < mConsumerQueueNames.length; ++index) {
                destinationQueues[index] = session.createQueue(mConsumerQueueNames[index]);
            }

            connection.start();

            while (!mStop) {

                // Only wait QUEUE_WAIT_TIME in order to give
                // the dispatcher a chance to see if it should
                // quit
                Message m = consumer.receive(QUEUE_WAIT_TIME);
                if (m == null) {
                    continue;
                }

                // Take the message we received and put
                // it in each of the consumers destination
                // queues for them to process
                for (Queue q : destinationQueues) {
                    producer.send(q, m);
                }
            }

        } catch (JMSException ex) {
            // Do wonderful things here 
        } finally {
            if (producer != null) {
                try {
                    producer.close();
                } catch (JMSException ex) {
                }
            }
            if (consumer != null) {
                try {
                    consumer.close();
                } catch (JMSException ex) {
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException ex) {
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException ex) {
                }
            }
        }
    }
}

Main.java

    QueueConnectionFactory factory = ...;

    Dispatcher dispatcher =
            new Dispatcher(
            factory,
            "Queue_Original",
            new String[]{
                "Consumer_Queue_1",
                "Consumer_Queue_2",
                "Consumer_Queue_3"});
    dispatcher.start();


 类似资料:
  • 由于消息需求的排序,我们有一个主题和一个分区。我们有两个消费者运行在不同的服务器上,具有相同的配置集,即groupId、consumerId和consumerGroup。即 1主题- 当我们部署消费者时,相同的代码会部署在两台服务器上。当消息到来时,我们会注意到两个消费者都在消费消息,而不是只有一个处理。让消费者在两台独立的服务器上运行的原因是,如果一台服务器崩溃,至少其他服务器可以继续处理消息。

  • 我有一个由第三方发布的JMS队列。我想在不同的机器上设置多个使用者,只有一台特定机器的使用者确认该队列上的消息。简而言之,如果特定机器的使用者没有接收到消息,那么该消息不应从队列中删除。这是可以实现的吗?

  • null 我在这一页上读到以下内容: 使用者从任何单个分区读取,允许您以与消息生成类似的方式扩展消息消耗的吞吐量。 也可以将使用者组织为给定主题的使用者组-组内的每个使用者从唯一分区读取,并且组作为一个整体使用来自整个主题的所有消息。 如果使用者多于分区,则某些使用者将空闲,因为它们没有可从中读取的分区。 如果分区多于使用者,则使用者将从多个分区接收消息。 如果使用者和分区的数量相等,则每个使用者

  • 我有三根线。线程1(T1)是生成器,它生成数据。线程2和线程3(T2和T3)分别等待T1的数据在单独的循环中处理。我正在考虑在线程之间共享BlockingQueue,并通过调用“Take”让T2和T3等待。

  • 我使用分布式jms队列,weblogic是我的应用服务器。在我的集群环境中部署了三个jms服务器。例如,生产者只是使用队列名称jndi lookup 'udq '来发送消息。现在,我已经为每个jms服务器关联了一个消费者,并且能够消费消息,到目前为止没有问题。 这里有一个问题,我是否可以让一个消费者使用来自3个jms服务器的消息。weblogic允许使用以下语法对目标查找进行jndi命名:@ 我只

  • 我有一个Kafka系统,看起来像这样(所有消费者都在一个消费者群体中): 在每个消费者中,我轮询消息,然后进行昂贵的计算(从1到60秒)。如果操作成功,我将提交消费者。 在我提交之前,另一个使用者是否会开始处理相同的消息?我需要保证,一旦消息被拾取,它就会被只执行一次 - 除非处理中途失败。