当前位置: 首页 > 知识库问答 >
问题:

在ActiveMQ中向多个使用者发送消息

孔征
2023-03-14

我是ActiveMQ新手。我曾尝试在activemq中实现生产者-消费者(发送者-接收器)。在我的代码中,我很容易发送

这是我的制片人

MsgProducer。Java语言

package jms_service;

import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class MsgProducer {

      private static String url = "failover://tcp://localhost:61616";
      public static javax.jms.ConnectionFactory connFactory;
      public static javax.jms.Connection connection;
      public static javax.jms.Session mqSession;
      public static javax.jms.Topic topic;
      public static javax.jms.MessageProducer producer;

      public static void main(String[] args) throws JMSException {

          connFactory = new ActiveMQConnectionFactory(url);  
          connection = connFactory.createConnection("system","manager");
          connection.start(); 
          mqSession = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);  

          topic = mqSession.createTopic("RealTimeData");
          producer = mqSession.createProducer(topic);                  
          producer.setTimeToLive(30000);

          TextMessage message = mqSession.createTextMessage();      

          int seq_id =1;

          while(true)
            {             
                message.setText("Hello world | " +"seq_id #"+seq_id);               
                 producer.send(message);
                 seq_id++;

                 System.out.println("sent_msg =>> "+ message.getText());
               //  if(seq_id>100000) break;

                    try {
                        Thread.sleep(1000);
                        } 
                    catch (InterruptedException e) { e.printStackTrace();}           
              }       

    }

}

MsgConsumer.java

package jms_service;

import java.text.SimpleDateFormat;
import java.util.Calendar;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class MsgConsumer {

          private static String url = "failover://tcp://localhost:61616";     
          public static javax.jms.ConnectionFactory connFactory;
          public static javax.jms.Connection connection;
          public static javax.jms.Session mqSession;
          public static javax.jms.Topic topic;
          public static javax.jms.MessageConsumer consumer;

        public static void main(String[] args) throws JMSException, InterruptedException {

            connFactory = new ActiveMQConnectionFactory(url);
            connection = connFactory.createConnection("system", "manager");
            connection.setClientID("0002");
            //connection.start();               
            mqSession = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
            topic = mqSession.createTopic("RealTimeData");
            consumer = mqSession.createDurableSubscriber(topic, "SUBS01");
            connection.start();

            MessageListener listner = new MessageListener() {
                public void onMessage(Message message) {
                    try {
                        if (message instanceof TextMessage) {
                            TextMessage txtmsg = (TextMessage) message;
                            Calendar cal = Calendar.getInstance();
                            //cal.getTime();
                            SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
                            String time = sdf.format(cal.getTime());

                            String msg="received_message =>> "+ txtmsg.getText() + " | received_at :: "+time;
                            System.out.println(msg);

                            //consumer.sendData(msg);
                        }

                        } catch (JMSException e) {
                            System.out.println("Caught:" + e);
                            e.printStackTrace();
                            }
                    }
            };

            consumer.setMessageListener(listner);  

      }


}

有谁能帮我找出向多个消费者发送信息的方法吗。提前谢谢。

共有3个答案

商松
2023-03-14

主题是最好的途径。一个生产者对多个消费者或一个出版商对多个订阅者。对于队列,您必须编写一个循环,以获取所有可能的消费者,并使用不同的目的地发送消息。你的动机还将决定是否使用队列或主题。

  1. 如果您认为您的消费者可能离线或有网络问题,请选择队列。在这种情况下,当他们重新打开时,他们将收到待处理的消息
  2. 对于主题,当断开连接时,它们将无法接收消息,除非您显式保留消息,但新消息会覆盖它们
空英逸
2023-03-14

假设你的问题是

有人能帮助找出向多个消费者发送消息的方法吗

在不阅读完整代码的情况下,一种方法可能是将客户放入一个集合中

static Vector<consumer> vecConsumer;

在这里,您输入每个新客户,并提供所有现有客户的参考。广播就像发送到单个客户端,例如封装在foreach循环中

for(consumer cons : vecConsumer)
{
      //send stuff or put in sending queue
}
习旻
2023-03-14

队列语义在所有消费者中只传递一次消息。这是根据JMS规范编写的(这是一本了解基础知识的好书)。

主题语义学向每个消费者传递信息。因此,主题可能是您需求的答案。

 类似资料:
  • 我们有一个用例,其中我们只创建一个消费者来处理队列中的消息。消息处理器在确认之前积累一定数量的消息。以异步方式接收消息并使用事务会话。消息的大小非常小。 在一定数量的消息之后,主动MQ停止向唯一的消费者发送进一步的消息,并等待确认。我们尝试过像consumer.prefetchSize,consumer . maximumpendingmessagelimit;但是什么都不管用。我们用一个只有一个

  • 在Kafka文献中: Kafka的处理方式不同。我们的主题被划分为一组完全有序的分区,每个分区在任何给定时间都由一个使用者使用。这意味着消费者在每个分区中的位置只是一个整数,即要消费的下一条消息的偏移量。这使得消耗量的状态非常小,每个分区只有一个数字。这种状态可以定期检查。这使得消息确认的等价物非常便宜。 然而,按照同一份文件中的快速入门指南,我很容易就能: 使用单个分区创建主题 创建一个游戏机制

  • 我使用Spring JMS和ActiveMQ,其中有一个客户机将消息推送到队列,有多个使用者线程监听并从队列中删除消息。有些时候,相同的消息会被两个使用者从队列中出列。我不希望这种行为,并希望确保仅有的一条消息由一个消费者线程处理。你知道我哪里出了问题吗? ActiveMQ 5.9.1配置:

  • 我有一个使用ActiveMQ的消息队列。web请求用persistency=true将消息放入队列。现在,我有两个消费者,它们都作为单独的会话连接到这个队列。使用者1总是确认消息,但使用者2从不这样做。 JMS队列实现负载平衡器语义。一条消息将被一个使用者接收。如果在发送消息时没有可用的使用者,它将被保留,直到有可以处理消息的使用者可用为止。如果使用者接收到一条消息,但在关闭之前没有确认它,那么该

  • 我是activeMQ的新手,在将消息从驻留在另一台服务器上的消息生成器推送到activeMQ定义的队列时遇到问题。 我在activeMQ上使用camel routes创建的应用程序中有几个队列。我尝试从另一台服务器上的应用程序对这些队列执行远程JNDI查找。我使用了来自http://activemq.apache.org/jndi-support.html页面的activemq文档片段。 我可以连

  • 我使用这个聊天室示例作为参考,设置了一个多房间的socket.io/nodeJS聊天服务器: https://raw.githubusercontent.com/socketio/socket.io/master/examples/chat/index.js 我使用这个doc将代码从单间修改为多间: https://socket.io/docs/roams-and-namespaces/ 我还修改