当前位置: 首页 > 面试题库 >

ActiveMQ和嵌入式代理

滑畅
2023-03-14
问题内容

编辑:改写问题:

我想将ActiveMQ用作服务器和客户端应用程序之间的信使服务。

我正在尝试在服务器内设置嵌入式代理(即不是单独的进程),以处理产生的消息供我的客户使用。该队列被保留。

经纪人初始化如下:

BrokerService broker = new BrokerService();
KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter();
adaptor.setDirectory(new File("activemq"));
broker.setPersistenceAdapter(adaptor);
broker.setUseJmx(true);
broker.addConnector("tcp://localhost:61616");
broker.start();

修补之后,我最终得到了服务器部分:

public static class HelloWorldProducer implements Runnable {
    public void run() {
        try {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost"); // apparently the vm part is all i need
            Connection connection = connectionFactory.createConnection(); 
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("TEST.FOO");
            MessageProducer producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
            TextMessage message = session.createTextMessage(text);
            System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName());
            producer.send(message);
            session.close();
            connection.close();
        }
        catch (Exception e) {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        }
    }
}

客户端非常相似,看起来像这样:

public static class HelloWorldConsumer implements Runnable, ExceptionListener {
    public void run() {
        try {
          ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost");
            Connection connection = connectionFactory.createConnection(); // exception happens here...
            connection.start();
            connection.setExceptionListener(this);
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("TEST.FOO");
            MessageConsumer consumer = session.createConsumer(destination);
            Message message = consumer.receive(1000);
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                String text = textMessage.getText();
                System.out.println("*****Received: " + text);
            } else {
                System.out.println("*****Received obj: " + message);
            }
            consumer.close();
            session.close();
            connection.close();
        } catch (Exception e) {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        }
    }

main方法只是在线程中启动其中的每一个,以开始生成/接收消息。

…但是我在每个线程的开头都遇到以下问题:

2013-01-24 07:54:31,271 INFO  [org.apache.activemq.broker.BrokerService] Using Persistence Adapter: AMQPersistenceAdapter(activemq-data/localhost)
2013-01-24 07:54:31,281 INFO  [org.apache.activemq.store.amq.AMQPersistenceAdapter] AMQStore starting using directory: activemq-data/localhost
2013-01-24 07:54:31,302 INFO  [org.apache.activemq.kaha.impl.KahaStore] Kaha Store using data directory activemq-data/localhost/kr-store/state
2013-01-24 07:54:31,339 INFO  [org.apache.activemq.store.amq.AMQPersistenceAdapter] Active data files: []
2013-01-24 07:54:31,445 DEBUG [org.apache.activemq.broker.jmx.ManagementContext] Probably not using JRE 1.4: mx4j.tools.naming.NamingService
2013-01-24 07:54:31,450 DEBUG [org.apache.activemq.broker.jmx.ManagementContext] Failed to create local registry
    java.rmi.server.ExportException: internal error: ObjID already in use
    at sun.rmi.transport.ObjectTable.putTarget(ObjectTable.java:186)
    at sun.rmi.transport.Transport.exportObject(Transport.java:92)
    at sun.rmi.transport.tcp.TCPTransport.exportObject(TCPTransport.java:247)
    at sun.rmi.transport.tcp.TCPEndpoint.exportObject(TCPEndpoint.java:411)
    at sun.rmi.transport.LiveRef.exportObject(LiveRef.java:147)
        <snip....>

看来消息已成功产生和使用(我先前发布的其他问题已解决),但是上述异常让我感到担忧。

编辑:在代理关闭期间,我现在也受到以下欢迎:

2013-01-25 08:40:17,486 DEBUG [org.apache.activemq.transport.failover.FailoverTransport] Transport failed with the following exception:
    java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392)
    at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:269)
    at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:210)
    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:202)
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
    at java.lang.Thread.run(Thread.java:722)

问题答案:

您可以通过多种方式将代理嵌入代码中,其中很多方法在此处记录。您可能要尝试升级您的版本,因为您使用的版本似乎已经很老了,因为它默认使用现已弃用的AMQ
Store,而不是较新的KahaDB存储。由于客户端线程之间存在竞争,您可能会遇到问题,因为它们使用可以争先在VM代理中创建的不同连接工厂。如果在生产者上设置create
= false选项,并确保使用者线程在此之后启动,则可以解决该问题,或者可以提前创建VM代理,然后在两个线程上添加create =
false,这样就可以解决问题。


BrokerService broker = new BrokerService();
// configure the broker
broker.setBrokerName("localhost");
broker.setUseJmx(false);
broker.start();

然后在客户端代码中,只需通过此连接工厂配置进行附加。

ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?create=false");


 类似资料:
  • 我正在尝试设置一个支持SSL的嵌入式ActiveMQ代理。 我不断得到相同的错误msg: 搜索这给出了在生成密钥存储和信任存储时可能发生故障的指示。 我试图使用这些指南生成密钥存储和信任存储,但没有成功。http://docs.oracle.com/javase/6/docs/technotes/guides/security/jsse/JSSERefGuide.html#CreateKeysto

  • 我有一个旧的应用程序,它用ActiveMQ 5.8.0处理JMS消息,还有一些JNDI远程主题连接到这个ActiveMQ。 我有一个这样的连接器: 它工作得很好,但是现在,由于一些技术原因(严格的JMS 1.1),我需要使用“ConnectionFactory”而不是“TopicConnectionFactory”。在实际配置中,我被卡住了,因为ActiveMQ似乎使用了“TopicConnect

  • 问题内容: 我正在尝试为连接到RabbitMQ代理的Scala / Java应用程序创建集成测试。为了实现这一点,我希望有一个嵌入式经纪人讲我在每次测试之前启动和停止的AMQP。最初,我试图将ActiveMQ引入AMQP作为嵌入式代理,但是该应用程序使用RabbitMQ,因此仅使用AMQP 0.9.3版,而ActiveMQ需要AMQP 1.0版。 我可以使用其他嵌入式代理代替ActiveMQ吗?

  • 我有时会在pom中看到以下声明。xml。。。 如您所见,sping-boo-starter-web被声明为tomcat-embed-jasper。 是不是sping-boo-starter-web已经有一个嵌入式tomcat了?为什么一些开发人员仍然声明tomcat-embed-jasper以及boot-starter-web?还是有什么原因?

  • Ruby, like fire, is a very useful friend, and a very dangerous enemy. — Mikkel Bruun 在模板中使用嵌入式 Ruby 帮助构建动态的配置文件或实现数组遍历是一种强大的方式。 然而,你也可以在配置清单中使用 inline_template 函数直接嵌入 Ruby 而不必使用分离的模板文件。 操作步骤 在 Puppet

  • 我有一个嵌入了XSD的XML,所以它类似于: 我目前正在使用嵌入式XSD的a克隆作为模式: null 但问题是,当我想取消封送一个条目XML时,我会得到以下错误消息: 意外元素(URI:“http://www.w3.org/2001/XMLSchema”,本地:“schema”)。所需元素为<{}table> 我尝试在我的XSD上添加标记,但没有成功,仍然会因为名称空间而得到一个错误(我想是吗?)