当前位置: 首页 > 面试题库 >

多线程JMS客户端ActiveMQ

华瀚漠
2023-03-14
问题内容

我正在使用以下代码创建多个JMS会话,以供多个使用者使用消息。我的问题是代码以单线程方式运行。即使消息存在于队列中,第二个线程也无法接收任何内容,而是继续轮询。同时,第一个线程完成对第一批的处理,然后返回并使用剩余的消息。这里的用法有什么问题吗?

static {
    try {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://172.16.143.99:61616");
        connection = connectionFactory.createConnection();
        connection.start();
    } catch (JMSException e) {
        LOGGER.error("Unable to initialise JMS Queue.", e);
    }

}

public JMSClientReader(boolean isQueue, String name) throws QueueException {

    init(isQueue,name);
}

@Override
public void init(boolean isQueue, String name) throws QueueException
{

    // Create a Connection
    try {
        // Create a Session
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        if (isQueue) {
            destination = new ActiveMQQueue(name);// session.createQueue("queue");
        } else {
            destination = new ActiveMQTopic(name);// session.createTopic("topic");
        }
        consumer = session.createConsumer(destination);
    } catch (JMSException e) {
        LOGGER.error("Unable to initialise JMS Queue.", e);
        throw new QueueException(e);
    }
}

public String readQueue() throws QueueException {

    // connection.setExceptionListener(this);
    // Wait for a message
    String text = null;
    Message message;
    try {
        message = consumer.receive(1000);
        if(message==null)
            return "done";
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            text = textMessage.getText();
            LOGGER.info("Received: " + text);
        } else {
            throw new JMSException("Invalid message found");
        }
    } catch (JMSException e) {
        LOGGER.error("Unable to read message from Queue", e);
        throw new QueueException(e);
    }


    LOGGER.info("Message read is " + text);
    return text;

}

问题答案:

您的问题是prefetchPolicy。

persistent queues (default value: 1000)
non-persistent queues (default value: 1000)
persistent topics (default value: 100)
non-persistent topics (default value: Short.MAX_VALUE - 1)

所有消息都被分派到第一个连接的使用者,当另一个连接时他没有收到消息,因此,如果您有一个队列的并发使用者,则要更改此行为,需要将prefetchPolicy设置为比默认值低的值。例如,将其添加jms.prefetchPolicy.queuePrefetch=1到activemq.xml的uri配置中,或像这样在客户端网址上进行设置

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://172.16.143.99:61616?jms.prefetchPolicy.queuePrefetch=1");

建议使用较大的预取值,以实现高性能和高消息量。但是,对于较低的消息量,其中每条消息都需要花费很长时间进行处理,因此应将预取设置为1。这样可以确保使用者一次只处理一条消息。但是,将预取限制指定为零将导致使用者一次轮询一次消息,而不是将消息推送到使用者。

看看http://activemq.apache.org/what-is-the-prefetch-limit-
for.html

http://activemq.apache.org/destination-
options.html



 类似资料:
  • 我正在使用java客户端连接到hazelcast集群。每次我连接到集群时,我都会观察到在成员的jvm上创建了大约5个客户机线程。这些线程在被销毁之前会停留大约30-40秒。这些线是什么?如何控制创建的客户端线程数? 我正在使用hazelcast all。3.3.3

  • 问题内容: 我正在尝试使用我一直在努力的客户端/服务器程序实现多线程。我需要允许多个客户端同时连接到服务器。我目前有4类:客户端,服务器,协议和用于处理线程的工作器。以下代码是我对这些类的拥有的代码: SocketServer类: SocketClient类别: 协议类别: ClientWorker类: 当我运行服务器和客户端时,一切正常。然后,当我尝试运行另一个客户端时,它只是挂在那儿,没有提示

  • 本文向大家介绍python多线程socket编程之多客户端接入,包括了python多线程socket编程之多客户端接入的使用技巧和注意事项,需要的朋友参考一下 Python中实现socket通信的服务端比较复杂,而客户端非常简单,所以客户端基本上都是用sockct模块实现,而服务 端用有很多模块可以使用,如下: 1、客户端 2、SocketServer模块 为了能够让多个客户端同时接入服务并进行通

  • 我是python套接字编程的初学者,正在尝试编写一个与联网家庭自动化设备(GlobalCache GC100)接口的库 我既需要通过TCP不断监听来自该硬件的传感器状态变化事件,也需要能够在用户发起的时间发送set_state命令(trip relays),而没有明显的延迟。 我有一个循环,它执行来拾取状态更改事件。这通常会超时()并继续到下一个循环迭代,直到设备推送状态更改数据。 我是想做套接字

  • 我正在使用Camel开发客户端/服务器请求/回复系统。 客户机和服务器使用两个JMS队列进行通信:请求队列和响应队列。 服务器端有一个camel路由,它使用来自请求队列的JMS消息并同时处理消息。使用响应队列将响应发送回客户端。 客户端向JMS队列发送消息并等待响应。我有两个问题: > 客户端实际上是其他应用程序将使用的库。我也想在客户端使用Camel,但不知道如何将Camel用作“函数”,即在我

  • 跟随wiki页面https://cwiki.apache.org/qpid/amqp-java-jms-messaging-client.html到https://cwiki.apache.org/qpid/connection-url-format.html. Simple me需要qpid-amqp-1-0-client-jms-0.20-sources。jar接受URI格式,但它似乎忽视了一