我试图让Wildfly和ActiveMQ与Apache Camel一起工作,让我解释一下这个场景。每小时都会有一个camel批处理轮询一个FTP服务器,获取文件并将其发送到ActiveMQ代理。代理实现了两个路由:numbers
和big.numbers
。消息被排入numbers
,如果它们还没有准备好发送,则被路由到big.numbers
。big.numbers
中的消息由camel处理器出列和翻译,并在准备发送时排队到numbers
。编号
中的准备发送消息被发送到Wildfly,MDB在那里执行某些操作。除了最后一步,一切都很好。当MDB接收消息时会引发异常:org.hornetq.jms.client.hornetqBytesMessage不能强制转换为javax.jms.textMessage
。下面是我用来在ActiveMQ代理上实现路由的camel.xml
:
...
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<package>edu.foo.amq.camel</package>
<dataFormats>
<string id="utf-8-string" charset="UTF-8"/>
</dataFormats>
<route id="toBigNumeri">
<from uri="activemq:numbers.queue"/>
<marshal ref="utf-8-string"/>
<choice>
<when>
<simple>
${in.header.readyToGo} != true
</simple>
<process ref="big.numbers.processor"/>
<to uri="activemq:big.numbers.queue"/>
</when>
<otherwise>
<process ref="done.processor"/>
<to uri="wildflycf:generatoreQueue?username=dummyusr&password=dummy1234"/>
</otherwise>
</choice>
</route>
<route id="toNumeri">
<from uri="activemq:big.numbers.queue"/>
<marshal ref="utf-8-string"/>
<split>
<tokenize token="\n"/>
<process ref="numbers.processor"/>
<setHeader headerName="readyToGo">
<constant>true</constant>
</setHeader>
<to uri="activemq:numbers.queue"/>
</split>
<process ref="dump.processor"/>
<to uri="activemq:dump.queue"/>
</route>
</camelContext>
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
<property name="userName" value="${activemq.username}"/>
<property name="password" value="${activemq.password}"/>
</bean>
</property>
</bean>
<bean id="jndiTmp" class="org.springframework.jndi.JndiTemplate">
<property name="environment">
<props>
<prop key="java.naming.provider.url">http-remoting://localhost:8080</prop>
<prop key="java.naming.factory.initial">org.jboss.naming.remote.client.InitialContextFactory</prop>
<prop key="java.naming.security.principal">jhon</prop>
<prop key="java.naming.security.credentials">doe</prop>
</props>
</property>
</bean>
<bean id="wfcf" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiTemplate" ref="jndiTmp"/>
<property name="jndiName" value="jms/RemoteConnectionFactory"/>
</bean>
<bean id="wildflycf" class="org.apache.camel.component.jms.JmsComponent">
<property name="connectionFactory" ref="wfcf"/>
</bean>
...
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.apache.log4j.Logger;
@MessageDriven(name="GeneratoreMDB", activationConfig = {
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"),
@ActivationConfigProperty(propertyName="destinationLookup", propertyValue=QueueHandler.jmsQueue)}
)
public class QueueHandler implements MessageListener {
public static final String jmsQueue = "/export/jms/generatoreQueue";
private static final Logger log = Logger.getLogger(QueueHandler.class);
@Override
public void onMessage(Message arg0) {
try {
log.info(((TextMessage) arg0).getText());
...
} catch (JMSException e) {
log.error(e);
}
}
}
以下是我的MDB抛出的异常:
2014-12-05 09:12:04,864 ERROR [org.hornetq.ra] (Thread-35 (HornetQ-client-global-threads-1727074612)) HQ154004: Failed to deliver message: javax.ejb.EJBTransactionRolledbackException: org.hornetq.jms.client.HornetQBytesMessage cannot be cast to javax.jms.TextMessage
at org.jboss.as.ejb3.tx.CMTTxInterceptor.handleInCallerTx(CMTTxInterceptor.java:163)
at org.jboss.as.ejb3.tx.CMTTxInterceptor.invokeInCallerTx(CMTTxInterceptor.java:253)
at org.jboss.as.ejb3.tx.CMTTxInterceptor.required(CMTTxInterceptor.java:342)
at org.jboss.as.ejb3.tx.CMTTxInterceptor.processInvocation(CMTTxInterceptor.java:239)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.as.ejb3.component.interceptors.CurrentInvocationContextInterceptor.processInvocation(CurrentInvocationContextInterceptor.java:41)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.as.ejb3.component.invocationmetrics.WaitTimeInterceptor.processInvocation(WaitTimeInterceptor.java:43)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.as.ejb3.security.SecurityContextInterceptor.processInvocation(SecurityContextInterceptor.java:95)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.as.ejb3.component.interceptors.ShutDownInterceptorFactory$1.processInvocation(ShutDownInterceptorFactory.java:64)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.as.ejb3.component.interceptors.LoggingInterceptor.processInvocation(LoggingInterceptor.java:59)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.as.ee.component.NamespaceContextInterceptor.processInvocation(NamespaceContextInterceptor.java:50)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.as.ejb3.component.interceptors.AdditionalSetupInterceptor.processInvocation(AdditionalSetupInterceptor.java:55)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.as.ejb3.component.messagedriven.MessageDrivenComponentDescription$5$1.processInvocation(MessageDrivenComponentDescription.java:211)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.invocation.ContextClassLoaderInterceptor.processInvocation(ContextClassLoaderInterceptor.java:64)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.invocation.InterceptorContext.run(InterceptorContext.java:326)
at org.wildfly.security.manager.WildFlySecurityManager.doChecked(WildFlySecurityManager.java:448)
at org.jboss.invocation.AccessCheckingInterceptor.processInvocation(AccessCheckingInterceptor.java:61)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.invocation.InterceptorContext.run(InterceptorContext.java:326)
at org.jboss.invocation.PrivilegedWithCombinerInterceptor.processInvocation(PrivilegedWithCombinerInterceptor.java:80)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.invocation.ChainedInterceptor.processInvocation(ChainedInterceptor.java:61)
at org.jboss.as.ee.component.ViewService$View.invoke(ViewService.java:185)
at org.jboss.as.ee.component.ViewDescription$1.processInvocation(ViewDescription.java:182)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.invocation.ChainedInterceptor.processInvocation(ChainedInterceptor.java:61)
at org.jboss.as.ee.component.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:73)
at edu.foo.incassionline.generatore.jms.QueueHandler$$$view3.onMessage(Unknown Source)
at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source) [:1.7.0_45]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) [rt.jar:1.7.0_45]
at java.lang.reflect.Method.invoke(Method.java:606) [rt.jar:1.7.0_45]
at org.jboss.as.ejb3.inflow.MessageEndpointInvocationHandler.doInvoke(MessageEndpointInvocationHandler.java:139)
at org.jboss.as.ejb3.inflow.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:73)
at edu.foo.incassionline.generatore.jms.QueueHandler$$$endpoint1.onMessage(Unknown Source)
at org.hornetq.ra.inflow.HornetQMessageHandler.onMessage(HornetQMessageHandler.java:319) [hornetq-ra-2.4.1.Final.jar:]
at org.hornetq.core.client.impl.ClientConsumerImpl.callOnMessage(ClientConsumerImpl.java:1116) [hornetq-core-client-2.4.1.Final.jar:]
at org.hornetq.core.client.impl.ClientConsumerImpl.access$500(ClientConsumerImpl.java:56) [hornetq-core-client-2.4.1.Final.jar:]
at org.hornetq.core.client.impl.ClientConsumerImpl$Runner.run(ClientConsumerImpl.java:1251) [hornetq-core-client-2.4.1.Final.jar:]
at org.hornetq.utils.OrderedExecutorFactory$OrderedExecutor$1.run(OrderedExecutorFactory.java:104) [hornetq-core-client-2.4.1.Final.jar:]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [rt.jar:1.7.0_45]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [rt.jar:1.7.0_45]
at java.lang.Thread.run(Thread.java:744) [rt.jar:1.7.0_45]
Caused by: java.lang.ClassCastException: org.hornetq.jms.client.HornetQBytesMessage cannot be cast to javax.jms.TextMessage
at edu.foo.incassionline.generatore.jms.QueueHandler.onMessage(QueueHandler.java:27)
at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source) [:1.7.0_45]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) [rt.jar:1.7.0_45]
at java.lang.reflect.Method.invoke(Method.java:606) [rt.jar:1.7.0_45]
at org.jboss.as.ee.component.ManagedReferenceMethodInterceptor.processInvocation(ManagedReferenceMethodInterceptor.java:52)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.invocation.WeavedInterceptor.processInvocation(WeavedInterceptor.java:53)
at org.jboss.as.ee.component.interceptors.UserInterceptorFactory$1.processInvocation(UserInterceptorFactory.java:63)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.invocation.InterceptorContext$Invocation.proceed(InterceptorContext.java:407)
at org.jboss.as.weld.ejb.Jsr299BindingsInterceptor.doMethodInterception(Jsr299BindingsInterceptor.java:82)
at org.jboss.as.weld.ejb.Jsr299BindingsInterceptor.processInvocation(Jsr299BindingsInterceptor.java:93)
at org.jboss.as.ee.component.interceptors.UserInterceptorFactory$1.processInvocation(UserInterceptorFactory.java:63)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.invocation.WeavedInterceptor.processInvocation(WeavedInterceptor.java:53)
at org.jboss.as.ee.component.interceptors.UserInterceptorFactory$1.processInvocation(UserInterceptorFactory.java:63)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.as.ejb3.component.invocationmetrics.ExecutionTimeInterceptor.processInvocation(ExecutionTimeInterceptor.java:43)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.invocation.InterceptorContext$Invocation.proceed(InterceptorContext.java:407)
at org.jboss.weld.ejb.AbstractEJBRequestScopeActivationInterceptor.aroundInvoke(AbstractEJBRequestScopeActivationInterceptor.java:55)
at org.jboss.as.weld.ejb.EjbRequestScopeActivationInterceptor.processInvocation(EjbRequestScopeActivationInterceptor.java:83)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.as.ee.concurrent.ConcurrentContextInterceptor.processInvocation(ConcurrentContextInterceptor.java:45)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.invocation.InitialInterceptor.processInvocation(InitialInterceptor.java:21)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.invocation.ChainedInterceptor.processInvocation(ChainedInterceptor.java:61)
at org.jboss.as.ee.component.interceptors.ComponentDispatcherInterceptor.processInvocation(ComponentDispatcherInterceptor.java:53)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.as.ejb3.component.pool.PooledInstanceInterceptor.processInvocation(PooledInstanceInterceptor.java:51)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.as.ejb3.tx.CMTTxInterceptor.invokeInCallerTx(CMTTxInterceptor.java:251)
... 49 more
我不明白为什么MDB以HornetQ消息而不是简单的JMS消息的形式获取消息。
在发送JMS消息时,Camel将消息正文转换为以下JMS消息类型(请参见此处):
╔══════════════════════╦═════════════════════════╗
║ Body Type ║ JMS Message ║
╠══════════════════════╬═════════════════════════╣
║ String ║ javax.jms.TextMessage ║
║ org.w3c.dom.Node ║ javax.jms.TextMessage ║
║ Map ║ javax.jms.MapMessage ║
║ java.io.Serializable ║ javax.jms.ObjectMessage ║
║ byte[] ║ javax.jms.BytesMessage ║
║ java.io.File ║ javax.jms.BytesMessage ║
║ java.io.Reader ║ javax.jms.BytesMessage ║
║ java.io.InputStream ║ javax.jms.BytesMessage ║
║ java.nio.ByteBuffer ║ javax.jms.BytesMessage ║
╚══════════════════════╩═════════════════════════╝
请在big.numbers.processor
中检查消息正文的类型。有两种可能:1)根据Camel的转换规则创建一个“文本消息类型正文”,并转换为TextMessage
,或者2)转换为BytesMessage
。
我的公司目前正在研究Thrift和ActiveMQ的集成。我们希望建立一个独立于语言的服务层,该服务层运行在单个http服务器上,每个thrift服务都能够通过ActiveMQ与其他thrift服务通信。到目前为止,我还没有找到任何其他人试图实施这一点。我很好奇其他人会如何实现这一点,以及是否有这样做的文档。 当前原型使用一个简单的python服务器来托管各种备用服务。在每个服务调用(即更新设置)
本文向大家介绍如何集成 Spring Boot 和 ActiveMQ?相关面试题,主要包含被问及如何集成 Spring Boot 和 ActiveMQ?时的应答技巧和注意事项,需要的朋友参考一下 对于集成 Spring Boot 和 ActiveMQ,我们使用 spring-boot-starter-activemq 依赖关系。 它只需要很少的配置,并且不需要样板代码。
在我们的应用程序中,我们使用Spring与ActiveMQ集成。我们面临一个问题,例如每当ActiveMQ服务器关闭时,应用程序都会抛出: 你能建议如何让我的应用程序在ActiveMQ关闭的情况下运行吗 请在下面找到ActiveMQ配置:
我目前正在尝试编写一个适配器,它将使用来自ActiveMQ的消息并将其发布到Kafka。 我正在考虑使用Spring集成来集成这两个消息传递系统。 我的问题是,我的应用程序不会维护模型的注册表,许多应用程序将使用该注册表将记录发布到activeMQ。我想接收这些javax-jms消息,并想执行一些转换,比如将jmscorrelationId添加到kafka消息中。 另外,另一个要求是仅当kafka
如果发生异常,ActiveMQ broker不会相应地重新传递消息。 当我使用进行简单的集成测试时,JMS消息会被重新传递,即可以实现重试机制 如何用ActiveMQ为Tomcat实现同样的功能?
本文向大家介绍springboot集成activemq的实例代码,包括了springboot集成activemq的实例代码的使用技巧和注意事项,需要的朋友参考一下 ActiveMQ ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JM