当前位置: 首页 > 知识库问答 >
问题:

ActiveMQ,一个主题上的多个消费者会减慢生产者发送速率吗?

饶承宣
2023-03-14

这里的一些配置:非持久消费者、非持久消息、禁用的流控制、默认预取大小、优化确认=true、异步发送=true、使用jms连接ActiveMQ

例如

一个生产者、一个消费者,

Producer————Topic————consumer

生产者发送速率可以达到6k/s

但是,在这种情况下:一个生产者三个消费者,

                /——consumer

Producer——-Topic——-consumer

                \——consumer

生产者发送速率下降到4k/s

这是我的一些关键代码:

发件人类别:

public class sender {

    public Boolean durable=false;
    public String clientID=null;
    public Boolean transacted=false;
    public int ackMode=Session.AUTO_ACKNOWLEDGE;
    public int timeToLive=0;
    public String queuename = "";
    public int persistent = DeliveryMode.NON_PERSISTENT;

    public Connection createConnection(String user,String pwd,String url) throws JMSException, Exception {   
         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, pwd, url);
         connectionFactory.setDispatchAsync(true);
         //connectionFactory.setAlwaysSessionAsync(false);
         Connection connection = connectionFactory.createConnection();   
         if (durable && clientID!=null) {   
             connection.setClientID(clientID);   
         }   
         connection.start();   
         return connection;   
        }  

    public Session createSession(Connection connection) throws Exception {   
        Session session = connection.createSession(transacted, ackMode);   
        return session;   
       }   

    public MessageProducer createProducer(Session session) throws JMSException {   
        Queue destination = session.createQueue(queuename);   
        MessageProducer producer = session.createProducer(destination);   
        producer.setDeliveryMode(persistent);   

        if( timeToLive!=0 )   
            producer.setTimeToLive(timeToLive);   
        return producer;   
        }   

    public void onMessage(Message message) {   
         //process message   
         } 
}

sendmain方法:

public static void main(String[] args) throws JMSException, Exception {
        // TODO Auto-generated method stub
        sender s = new sender();
        s.persistent = DeliveryMode.NON_PERSISTENT;
        Connection c = s.createConnection("","","tcp://localhost:61616?jms.useAsyncSend=true");
        Session sess = s.createSession(c);
        Topic topic = sess.createTopic("topic.test");
        MessageProducer mp = sess.createProducer(topic);
        StringBuffer tmpsb=new StringBuffer();
        for (int j=0;j<1024;j++)
        {
        tmpsb.append("0");
        }
        Message m = sess.createTextMessage(tmpsb.toString());
        long pre=System.currentTimeMillis();
        for (int i=0;i<10000;i++)
        {
            mp.send(m);
        }
        long post=System.currentTimeMillis();
        mp.close();
        System.out.println("sendtime:"+(post-pre));
        System.out.println("sendrate:"+10000000/(float)(post-pre));
        System.out.println("timenow:"+(post));
    }

接收机类代码:

public class receiver implements MessageListener
{
    public  int receivetimes=0;
    public long pretime;

    public void onMessage(Message msg)
    {
        //TextMessage tm = (TextMessage) msg;
        try {
            if (receivetimes==0)
            {
                pretime=System.currentTimeMillis();
            }
            receivetimes+=1;
            if (receivetimes==10000)
            {
                long now=System.currentTimeMillis();
                System.out.println("time:"+(now-pretime)+"\nrecive rate:"+9999999/(float)(now-pretime));
                System.out.println("timenow:"+(now));
                receivetimes=0;
            }

        } catch(Throwable t) {
            t.printStackTrace();
        }
    }
}

接收器类代码在这里隐藏了一些方法,例如创建连接,创建会话或类似发送者类的东西。

接收器主要方法:

public static void main(String[] args) throws JMSException, Exception {
        // TODO Auto-generated method stub
        receiver s = new receiver();
        Connection c = s.createConnection("","","tcp://localhost:6151?jms.optimizeAcknowledge=true");
        Session sess = s.createSession(c);
        Topic destination  = sess.createTopic("topic.test");   
        MessageConsumer  consumer = sess.createConsumer(destination);  
        consumer.setMessageListener(new receiver());   
    }

每个消费者都在一个独立的过程中。我运行了三个消费者和一个生产者,然后我得到了一个表现不佳的结果。有人知道我为什么会得到这个吗?

共有1个答案

农雅畅
2023-03-14

正如@TimBish所说。问题是“生产者,消费者,同一台机器上的activemq服务器”。当分开时,问题永远不会出现。

严格地测试某些东西是非常重要的。。。。。。。

 类似资料:
  • 我有一个使用ActiveMQ的消息队列。web请求用persistency=true将消息放入队列。现在,我有两个消费者,它们都作为单独的会话连接到这个队列。使用者1总是确认消息,但使用者2从不这样做。 JMS队列实现负载平衡器语义。一条消息将被一个使用者接收。如果在发送消息时没有可用的使用者,它将被保留,直到有可以处理消息的使用者可用为止。如果使用者接收到一条消息,但在关闭之前没有确认它,那么该

  • 我有三根线。线程1(T1)是生成器,它生成数据。线程2和线程3(T2和T3)分别等待T1的数据在单独的循环中处理。我正在考虑在线程之间共享BlockingQueue,并通过调用“Take”让T2和T3等待。

  • 我有一个生产者/消费者场景,我不希望一个生产者交付产品,多个消费者消费这些产品。然而,常见的情况是,交付的产品只被一个消费者消费,而其他消费者从未看到过这个特定的产品。我不想实现的是,一个产品被每个消费者消费一次,而没有任何形式的阻碍。 我的第一个想法是使用多个BlockingQueue,每个消费者使用一个,并使生产者将每个产品按顺序放入所有可用的BlockingQueues中。但是,如果其中一个

  • 出于测试原因,我指定了low memoryusage limit low(35MB)以使问题apear更快,但实际情况是,当activemq的问题出现时,我最终需要它来删除旧消息。 我发现了一个不令人满意的解决方案,即在ActiveMQConnectionFactory中设置useasyncsEnd=true,并指定sendtimeout。这使得producer不会被阻塞,但通过这种方式,最新的消

  • 问题内容: 因此,我已经看到了许多在Go中实现一个消费者和许多生产者的方法-Go 并发中的经典fanIn函数。 我想要的是fanOut功能。它以一个通道作为参数,它从中读取一个值,并返回一个通道片,该通道将这个值的副本写入其中。 有没有正确/推荐的方法来实现这一目标? 问题答案: 您几乎描述了执行此操作的最佳方法,但这是执行此操作的一小段代码示例。 去游乐场:https : //play.gola

  • 我正在尝试了解RxJava并发的一些细节,但我不确定我的想法是否正确。我对SubscribeOn/观察的工作原理有很好的了解,但我正在尝试确定池调度程序的一些细节。为此,我正在考虑尽可能简单地实现一个1-N生产者-消费者链,其中消费者的数量与CPU一样多。 根据文档,Schedulers.computation()由与内核一样多的线程池支持。但是,根据Reactive合约,运算符只能获得顺序调用。