编辑:改写问题:
我想将ActiveMQ用作服务器和客户端应用程序之间的信使服务。
我正在尝试在服务器内设置嵌入式代理(即不是单独的进程),以处理产生的消息供我的客户使用。该队列被保留。
经纪人初始化如下:
BrokerService broker = new BrokerService();
KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter();
adaptor.setDirectory(new File("activemq"));
broker.setPersistenceAdapter(adaptor);
broker.setUseJmx(true);
broker.addConnector("tcp://localhost:61616");
broker.start();
修补之后,我最终得到了服务器部分:
public static class HelloWorldProducer implements Runnable {
public void run() {
try {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost"); // apparently the vm part is all i need
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("TEST.FOO");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
TextMessage message = session.createTextMessage(text);
System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName());
producer.send(message);
session.close();
connection.close();
}
catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
}
客户端非常相似,看起来像这样:
public static class HelloWorldConsumer implements Runnable, ExceptionListener {
public void run() {
try {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost");
Connection connection = connectionFactory.createConnection(); // exception happens here...
connection.start();
connection.setExceptionListener(this);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("TEST.FOO");
MessageConsumer consumer = session.createConsumer(destination);
Message message = consumer.receive(1000);
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("*****Received: " + text);
} else {
System.out.println("*****Received obj: " + message);
}
consumer.close();
session.close();
connection.close();
} catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
main方法只是在线程中启动其中的每一个,以开始生成/接收消息。
…但是我在每个线程的开头都遇到以下问题:
2013-01-24 07:54:31,271 INFO [org.apache.activemq.broker.BrokerService] Using Persistence Adapter: AMQPersistenceAdapter(activemq-data/localhost)
2013-01-24 07:54:31,281 INFO [org.apache.activemq.store.amq.AMQPersistenceAdapter] AMQStore starting using directory: activemq-data/localhost
2013-01-24 07:54:31,302 INFO [org.apache.activemq.kaha.impl.KahaStore] Kaha Store using data directory activemq-data/localhost/kr-store/state
2013-01-24 07:54:31,339 INFO [org.apache.activemq.store.amq.AMQPersistenceAdapter] Active data files: []
2013-01-24 07:54:31,445 DEBUG [org.apache.activemq.broker.jmx.ManagementContext] Probably not using JRE 1.4: mx4j.tools.naming.NamingService
2013-01-24 07:54:31,450 DEBUG [org.apache.activemq.broker.jmx.ManagementContext] Failed to create local registry
java.rmi.server.ExportException: internal error: ObjID already in use
at sun.rmi.transport.ObjectTable.putTarget(ObjectTable.java:186)
at sun.rmi.transport.Transport.exportObject(Transport.java:92)
at sun.rmi.transport.tcp.TCPTransport.exportObject(TCPTransport.java:247)
at sun.rmi.transport.tcp.TCPEndpoint.exportObject(TCPEndpoint.java:411)
at sun.rmi.transport.LiveRef.exportObject(LiveRef.java:147)
<snip....>
看来消息已成功产生和使用(我先前发布的其他问题已解决),但是上述异常让我感到担忧。
编辑:在代理关闭期间,我现在也受到以下欢迎:
2013-01-25 08:40:17,486 DEBUG [org.apache.activemq.transport.failover.FailoverTransport] Transport failed with the following exception:
java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:269)
at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:210)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:202)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
at java.lang.Thread.run(Thread.java:722)
您可以通过多种方式将代理嵌入代码中,其中很多方法在此处记录。您可能要尝试升级您的版本,因为您使用的版本似乎已经很老了,因为它默认使用现已弃用的AMQ
Store,而不是较新的KahaDB存储。由于客户端线程之间存在竞争,您可能会遇到问题,因为它们使用可以争先在VM代理中创建的不同连接工厂。如果在生产者上设置create
= false选项,并确保使用者线程在此之后启动,则可以解决该问题,或者可以提前创建VM代理,然后在两个线程上添加create =
false,这样就可以解决问题。
BrokerService broker = new BrokerService();
// configure the broker
broker.setBrokerName("localhost");
broker.setUseJmx(false);
broker.start();
然后在客户端代码中,只需通过此连接工厂配置进行附加。
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?create=false");
我正在尝试设置一个支持SSL的嵌入式ActiveMQ代理。 我不断得到相同的错误msg: 搜索这给出了在生成密钥存储和信任存储时可能发生故障的指示。 我试图使用这些指南生成密钥存储和信任存储,但没有成功。http://docs.oracle.com/javase/6/docs/technotes/guides/security/jsse/JSSERefGuide.html#CreateKeysto
我有一个旧的应用程序,它用ActiveMQ 5.8.0处理JMS消息,还有一些JNDI远程主题连接到这个ActiveMQ。 我有一个这样的连接器: 它工作得很好,但是现在,由于一些技术原因(严格的JMS 1.1),我需要使用“ConnectionFactory”而不是“TopicConnectionFactory”。在实际配置中,我被卡住了,因为ActiveMQ似乎使用了“TopicConnect
问题内容: 我正在尝试为连接到RabbitMQ代理的Scala / Java应用程序创建集成测试。为了实现这一点,我希望有一个嵌入式经纪人讲我在每次测试之前启动和停止的AMQP。最初,我试图将ActiveMQ引入AMQP作为嵌入式代理,但是该应用程序使用RabbitMQ,因此仅使用AMQP 0.9.3版,而ActiveMQ需要AMQP 1.0版。 我可以使用其他嵌入式代理代替ActiveMQ吗?
我有时会在pom中看到以下声明。xml。。。 如您所见,sping-boo-starter-web被声明为tomcat-embed-jasper。 是不是sping-boo-starter-web已经有一个嵌入式tomcat了?为什么一些开发人员仍然声明tomcat-embed-jasper以及boot-starter-web?还是有什么原因?
Ruby, like fire, is a very useful friend, and a very dangerous enemy. — Mikkel Bruun 在模板中使用嵌入式 Ruby 帮助构建动态的配置文件或实现数组遍历是一种强大的方式。 然而,你也可以在配置清单中使用 inline_template 函数直接嵌入 Ruby 而不必使用分离的模板文件。 操作步骤 在 Puppet
我有一个嵌入了XSD的XML,所以它类似于: 我目前正在使用嵌入式XSD的a克隆作为模式: null 但问题是,当我想取消封送一个条目XML时,我会得到以下错误消息: 意外元素(URI:“http://www.w3.org/2001/XMLSchema”,本地:“schema”)。所需元素为<{}table> 我尝试在我的XSD上添加标记,但没有成功,仍然会因为名称空间而得到一个错误(我想是吗?)