当前位置: 首页 > 知识库问答 >
问题:

当在ActiveMQ中配置网络代理时,并非所有使用者都使用消息

萧英光
2023-03-14

我在同一台机器上有两个应用程序实例(尽管它也可能在不同的机器上),两个Tomcat实例具有不同的端口,Apache ActiveMQ嵌入到应用程序中。

我已经配置了一个静态代理网络,这样来自一个实例的消息也可以被所有其他实例使用(每个实例可以是生产者和消费者)。

servlet:

package com.activemq.servlet;

import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;

import javax.jms.JMSException;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import com.activemq.ActiveMQStartup;
import com.activemq.MQPublisher;
import com.activemq.SendMsg;
import com.activemq.SendMsgToAllInstance;
import com.activemq.TestPublisher;

/**
 * Servlet implementation class ActiveMQStartUpServlet
 */
@WebServlet(value = "/activeMQStartUpServlet", loadOnStartup = 1)
public class ActiveMQStartUpServlet extends HttpServlet {
    private static final long serialVersionUID = 1L;
    private ActiveMQStartup mqStartup = null;
    private static final Map pooledPublishers = new HashMap();

    @Override
    public void init(ServletConfig config) throws ServletException {
        System.out.println("starting servelt--------------");
        super.init(config);
        //Apache Active MQ Startup
        mqStartup = new ActiveMQStartup();
        mqStartup.startBrokerService();

    }

    @Override
    protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        System.out.println(req.getParameter("distributedMsg"));
        String mqConfig = null;
        String distributedMsg = req.getParameter("distributedMsg");
        String simpleMsg = req.getParameter("simpleMsg");
        if (distributedMsg != null && !distributedMsg.equals(""))
            mqConfig = "distributedMsg";
        else if (simpleMsg != null && !simpleMsg.equals(""))
            mqConfig = "simpleMsg";
        MQPublisher publisher = acquirePublisher(mqConfig);
        try {
            publisher.publish(mqConfig);
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            releasePublisher(publisher);
        }
    }

    @SuppressWarnings("unchecked")
    private void releasePublisher(MQPublisher publisher) {
        if (publisher == null) return;
        @SuppressWarnings("rawtypes")
        LinkedList publishers;
        TestPublisher poolablePublisher = (TestPublisher)publisher;
        publishers = getPooledPublishers(poolablePublisher.getConfigurationName());
        synchronized (publishers) {
            publishers.addLast(poolablePublisher);
        }

    }

    private MQPublisher acquirePublisher(String mqConfig) {
        LinkedList publishers = getPooledPublishers(mqConfig);
        MQPublisher publisher = getMQPubliser(publishers);
        if (publisher != null) return publisher;
        try {
            if (mqConfig.equals("distributedMsg"))
                return new TestPublisher(MQConfiguration.getConfiguration("distributedMsg"), new SendMsgToAllInstance());
            else    
                return new TestPublisher(MQConfiguration.getConfiguration("simpleMsg"), new SendMsg());
        }catch(Exception e){
            e.printStackTrace();
        }
        return null;
    }

    private LinkedList getPooledPublishers(String mqConfig) {
         LinkedList publishers = null;
         publishers = (LinkedList) pooledPublishers.get(mqConfig);
         if (publishers == null) {
             synchronized(pooledPublishers) {
                 publishers = (LinkedList) pooledPublishers.get(mqConfig);
                 if (publishers == null) {
                     publishers = new LinkedList();
                     pooledPublishers.put(mqConfig, publishers);
                 }
             }
         }
        return publishers;
    }

    private MQPublisher getMQPubliser(LinkedList publishers) {
        synchronized (publishers) {
            while (!publishers.isEmpty()) {
                TestPublisher publisher = (TestPublisher)publishers.removeFirst();
                return publisher;
            }
        }
        return null;
    }




}

配置:

package com.activemq.servlet;

import java.util.HashMap;
import java.util.Map;

import javax.jms.JMSException;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;

import org.apache.activemq.ActiveMQConnectionFactory;

import com.activemq.ActiveMQContext;

public class MQConfiguration {
    private static final Map configurations = new HashMap();
    private String mqConfig;
    private String topicName;
    private TopicConnection topicConnection = null;

    private MQConfiguration(String mqConfig, String string, String string2) {
        this.mqConfig = mqConfig;

        try {
            String topicFactoryConName = ActiveMQContext.getProperty(mqConfig);
            this.topicName = (mqConfig.equals("distributedMsg") ? ActiveMQContext.getProperty("distributedTopic"):ActiveMQContext.getProperty("normalTopic"));
            TopicConnectionFactory factory = (ActiveMQConnectionFactory) ActiveMQContext.getContext()
                    .lookup(topicFactoryConName);
            this.topicConnection = factory.createTopicConnection();
            this.topicConnection.start();
        } catch (Exception e) {
            System.out.println("error: " + e);
        }
    }

    public static MQConfiguration getConfiguration(String mqConfig) {
        if (mqConfig == null || "".equals(mqConfig)) {
            throw new IllegalArgumentException("mqConfig is null or empty");
        }

        MQConfiguration config = null;

        if (config != null) {
            return config;
        }
        synchronized (configurations) {
            config = (MQConfiguration) configurations.get(mqConfig);
            if (config == null) {
                config = new MQConfiguration(mqConfig, "userName", "userPassword");
            }
            configurations.put(mqConfig, config);
        }

        return config;
    }

    public String getMqConfig() {
        return this.mqConfig;
    }

    public TopicSession createTopicSession(boolean isTransacted, int autoAcknowledge) throws JMSException {
        if (this.topicConnection == null) {
            IllegalStateException ise = new IllegalStateException("topic connection not configured");
            throw ise;
        }
        return this.topicConnection.createTopicSession(isTransacted, autoAcknowledge);
    }

    public Topic getTopic() {
        try {
            return (Topic) ActiveMQContext.getContext().lookup(this.topicName);
        } catch (Exception e) {
            e.getMessage();
        }
        return null;
    }
}

发布者:

package com.activemq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;

import com.activemq.servlet.MQConfiguration;

public class TestPublisher implements MQPublisher {
    private final String configurationName;
    private TopicSession topicSession = null;
    private TopicPublisher topicPublisher = null;

    public TestPublisher(MQConfiguration config, Object messageListener) throws JMSException {
        if (config == null) {
            throw new IllegalArgumentException("config == null");
        }
        Topic topic = config.getTopic();
        this.configurationName = config.getMqConfig();
        this.topicSession = config.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        this.topicPublisher = this.topicSession.createPublisher(topic);
        MessageConsumer msgConsumer = this.topicSession.createConsumer(topic);
        msgConsumer.setMessageListener((MessageListener) messageListener);
    }

    @Override
    public void publish(String msg) throws JMSException {
        this.topicPublisher.publish(createMessage(msg, this.topicSession));
    }

    private Message createMessage(String msg, Session session) throws JMSException {
        TextMessage message = session.createTextMessage(msg);
        return message;
    }

    public String getConfigurationName() {
        return this.configurationName;
    }
}

消费者:

package com.activemq;

import javax.jms.Message;
import javax.jms.MessageListener;

public class SendMsgToAllInstance implements MessageListener {

    @Override
    public void onMessage(Message arg0) {
        System.out.println("distributed message-------------");

        // We have call to dao layer to to fetch some data and cached it

    }

}

JNDI:activemq-jndi.properties

# JNDI properties file to setup the JNDI server within ActiveMQ

#
# Default JNDI properties settings
#
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:61616
activemq.network.connector=static:(tcp://localhost:61620)

#activemq.network.connector=broker:(tcp://localhost:61619,network:static:tcp://localhost:61620)?persistent=false&useJmx=true
activemq.data.directory=data61619
activemq.jmx.port=1099

#
# Set the connection factory name(s) as well as the destination names. The connection factory name(s)
# as well as the second part (after the dot) of the left hand side of the destination definition
# must be used in the JNDI lookups.
#
connectionFactoryNames = distributedMsgFactory,simpleMsgFactory
topic.jms/distributedTopic=distributedTopic
topic.jms/normalTopic=normalTopic

distributedMsg=distributedMsgFactory
simpleMsg=simpleMsgFactory

distributedTopic=jms/distributedTopic
normalTopic=jms/normalTopic

ActiveMQStartup:

package com.activemq;

import java.net.URI;

import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.security.JaasAuthenticationPlugin;

public class ActiveMQStartup {
    private final String bindAddress;
    private final String dataDirectory;
    private BrokerService broker = new BrokerService();
    protected final int numRestarts = 3;
    protected final int networkTTL = 2;
    protected final int consumerTTL = 2;
    protected final boolean dynamicOnly = true;
    protected final String networkBroker;
    protected final String jmxPort;

    public ActiveMQStartup() {
        ActiveMQContext context = new ActiveMQContext();
        context.loadJndiProperties();
        bindAddress = ActiveMQContext.getProperty("java.naming.provider.url");
        dataDirectory = ActiveMQContext.getProperty("activemq.data.directory");
        networkBroker = ActiveMQContext.getProperty("activemq.network.connector");
        jmxPort = ActiveMQContext.getProperty("activemq.jmx.port");
    }

    // Start activemq broker service
    public void startBrokerService() {
        try {
            broker.setDataDirectory("../" + dataDirectory);
            broker.setBrokerName(dataDirectory);
            broker.setUseShutdownHook(true);
            TransportConnector connector = new TransportConnector();
            connector.setUri(new URI(bindAddress));         

            //broker.setPlugins(new BrokerPlugin[]{new JaasAuthenticationPlugin()});
            ManagementContext mgContext = new ManagementContext();
            if (networkBroker != null && !networkBroker.isEmpty()) {
                NetworkConnector networkConnector = broker.addNetworkConnector(networkBroker);
                networkConnector.setName(dataDirectory);
                mgContext.setConnectorPort(Integer.parseInt(jmxPort));
                broker.setManagementContext(mgContext);
                configureNetworkConnector(networkConnector);
            }
            broker.setNetworkConnectorStartAsync(true);
            broker.addConnector(connector);
            broker.start();
        } catch (Exception e) {
            System.out.println("Failed to start Apache MQ Broker : " + e);
        }
    }

    private void configureNetworkConnector(NetworkConnector networkConnector) {
        networkConnector.setDuplex(true);
        networkConnector.setNetworkTTL(networkTTL);
        networkConnector.setDynamicOnly(dynamicOnly);
        networkConnector.setConsumerTTL(consumerTTL);
        //networkConnector.setStaticBridge(true);
    }

    // Stop broker service
    public void stopBrokerService() {
        try {
            broker.stop();
        } catch (Exception e) {
            System.out.println("Unable to stop the ApacheMQ Broker service " + e);
        }
    }
}

我正在一个接一个地启动tomcat实例,并看到代理之间的网络连接正在建立。

当我从实例 1 或 instance2(第一次)发送消息时,它仅在该实例上使用,但是当我从第二个实例发送消息时,它同时被两者都使用;

git代码:https://github.com/AratRana/ApacheActiveMQ

你能指出我错在哪里吗?

共有1个答案

益炜
2023-03-14

最后,我能够做到。当我在服务器启动期间启动使用者时,我能够在所有实例中看到消息使用者。因此,为了实现这一目标,消费者需要在发布任何消息之前启动。

 类似资料:
  • 我有10个消费者和10个分区。我取分区数 并且使用相同的group.id创建相同数量的消费者。 我也发现很少这样的日志->

  • 我使用网络连接器配置了两个代理 A 和 B。如果我使用独占使用者(单个使用者)或消息组(JMXgroupID),消息顺序是否保留? 在经纪人文档的网络中,我发现: 代理网络不会保留总消息排序。总排序适用于单个使用者,但网络桥引入第二个使用者。此外,网络桥接使用者通过 producer.send(..) 转发消息,因此它们从转发代理上的队列头转到目标上队列的尾部。如果单个使用者在联网代理之间移动,则

  • 我们正在运行活动 MQ 5.6.0。在我们的测试环境中,我们有 3 个代理在静态网络中运行。下面是当前方案。我们有6个消费者随机连接到3个经纪人。一个经纪人有3个消费者,第二个有2个,第三个有1个。当我们向队列堆积消息时,我们看到消息积压在第三个代理上,有 1 个使用者,另外两个代理没有获得任何积压,其余 5 个使用者处于空闲状态。 在下面,您将找到我们所有一个代理(dev.queue01)的配置

  • 为了使用Kafka通用地发布消息,我使用类名作为主题: 服务器属性(我从默认属性中唯一更改的内容): 注意:我还尝试了以下用户设置:

  • 另外,JMS使用什么标准来确定一个专属消费者何时死亡或消失?