我想构建一个简单的消费者程序(java ),以获取ActiveMQ主题中存储的所有消息。我有一个在队列中发送文本消息的生成器。
但我不知道如何开始写我的消费者来检索旧消息并等待新消息。
如果你有一个例子,谢谢!
这是我的制片人:http://pastebin.com/uRy9D8mY
这是我的消费者:http://pastebin.com/bZh4r66e
当我先于消费者运行生产者,然后运行消费者时,我什么也没得到。当我运行我的使用者然后是我的生产者时,我在队列中添加了72条消息,但我的使用者只收到了24条消息......
使用下面给出的代码,您可以读取队列中排队的所有消息。
如果您需要一个无止境的使用者,每当新添加到队列中时,该使用者将读取所有消息,然后删除else部分,这样程序将不会终止。
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection con = factory.createConnection();
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("tmp_queue2");
MessageConsumer consumer = session.createConsumer(queue);
con.start();
while (true) {
Message msg = consumer.receive(5000);
if (msg instanceof TextMessage) {
TextMessage tm = (TextMessage) msg;
System.out.println(tm.getText());
}
else{
System.out.println("Queue Empty");
con.stop();
break;
}
}
希望这个消费者计划能够帮助刚接触ActiveMQ的人。
我建议阅读本教程(如Apache ActiveMQ)SUN Jms教程
有很多方法可以编写JMS/ActiveMQ程序,使用各种框架,例如Spring,或者使用纯java。
基本上,编写一个这样的侦听器类:
public class MyListener implements MessageListener{
public void onMessage(Message message){
// Read and handle message here.
}
}
因为您已经在生成消息,所以我假设您已经建立并运行了连接。
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createConsumer("MyQueue");
listener = new MyListener ();
consumer.setMessageListener(listener);
connection.start();
// At this point, messages should arrive from the queue to your listener.
还有一些错误处理代码没有包含在本示例中,但是您应该能够在教程和JMS文档的帮助下找到它们。
在从动态队列(ActiveMQ)(挂起的消息=1000)读取消息时,我已经确认了每条消息,现在退出队列的消息数=1000。 有没有办法将所有出列的消息再次放入队列中。任何物理备份所有消息的解决方案。 提前感谢
我正在使用Azure服务总线队列。但是我不能使用“获取所有队列消息(peek Lock):微软内置于api”从队列中获取所有消息。 有没有办法获取所有队列消息? {"$连接":{"值":{"servicebus_1":{"连接ID":"/订阅/c776fex3-6aec-4722-b099-b054c267b240/资源组/Plugin-Resources/提供者/Microsoft.网络/连接/
我们设置了几个ActiveMQ Artemis 2.17.0集群,以便在数据中心之间使用镜像进行复制。 在ActiveMQ Artemis 2.17.0或更高版本中有什么方法可以实现这一点吗?
我有一个特殊的要求,即获取队列中所有挂起的消息,以便进一步处理它们。详细说明。这就是应用程序的流程: 有一个队列设置将承载消息 默认情况下,使用MessageListenerContainer的stop()方法停止该队列的侦听器 将通过调用listenerContainer按需侦听队列中的消息。start() 要求是我应该一次获取队列中可用的所有消息,以便只处理那些消息。如果由于任何原因,在处理现
我试图获得java中activeMQ的所有队列名称,我在这里和这里找到了一些关于这个的话题,人们建议使用DestinationSource,我在编写代码时无法在Eclipse中导入它。我试过了: 我使用的是 java 1.7 和最新的 activemq 版本 5.14.1。任何想法,如果目标源仍然受支持?谢谢
我想使用来自特定队列或具有给定密钥的特定交换的多个消息。 因此,场景如下所示: 发布者发布消息1在队列1发布者发布消息2在队列1发布者发布消息3在队列1发布者发布消息4在队列2发布者发布消息5在队列2...消费者从队列1中消费消息一次获取[消息1,消息2,消息3],并在一次回调中处理它们 这些消息不是同时出现的,就像事件一样,我希望将它们收集到队列中,打包并发送给第三方。 我也读过这篇文章: ht