当前位置: 首页 > 知识库问答 >
问题:

用Apache Camel集成ActiveMQ和Wildfly

郭志
2023-03-14

我试图让Wildfly和ActiveMQ与Apache Camel一起工作,让我解释一下这个场景。每小时都会有一个camel批处理轮询一个FTP服务器,获取文件并将其发送到ActiveMQ代理。代理实现了两个路由:numbersbig.numbers。消息被排入numbers,如果它们还没有准备好发送,则被路由到big.numbersbig.numbers中的消息由camel处理器出列和翻译,并在准备发送时排队到numbers编号中的准备发送消息被发送到Wildfly,MDB在那里执行某些操作。除了最后一步,一切都很好。当MDB接收消息时会引发异常:org.hornetq.jms.client.hornetqBytesMessage不能强制转换为javax.jms.textMessage。下面是我用来在ActiveMQ代理上实现路由的camel.xml

...
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
    <package>edu.foo.amq.camel</package>
    <dataFormats>
        <string id="utf-8-string" charset="UTF-8"/>
    </dataFormats>


    <route id="toBigNumeri">
        <from uri="activemq:numbers.queue"/>
        <marshal ref="utf-8-string"/>
        <choice>
            <when>
                <simple>
                    ${in.header.readyToGo} != true
                </simple>
                <process ref="big.numbers.processor"/>
                <to uri="activemq:big.numbers.queue"/>
            </when>
            <otherwise>
                <process ref="done.processor"/>
                <to uri="wildflycf:generatoreQueue?username=dummyusr&amp;password=dummy1234"/>
            </otherwise>
        </choice>           
    </route>

    <route id="toNumeri">
        <from uri="activemq:big.numbers.queue"/>
        <marshal ref="utf-8-string"/>
        <split>
            <tokenize token="\n"/>
            <process ref="numbers.processor"/>
            <setHeader headerName="readyToGo">
                <constant>true</constant>
            </setHeader>
            <to uri="activemq:numbers.queue"/>
        </split>
        <process ref="dump.processor"/>
        <to uri="activemq:dump.queue"/>
    </route>
</camelContext>

<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="connectionFactory">
      <bean class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"/>
        <property name="userName" value="${activemq.username}"/>
        <property name="password" value="${activemq.password}"/>
      </bean>
    </property>
</bean>

<bean id="jndiTmp" class="org.springframework.jndi.JndiTemplate">
    <property name="environment">
        <props>
            <prop key="java.naming.provider.url">http-remoting://localhost:8080</prop>
            <prop key="java.naming.factory.initial">org.jboss.naming.remote.client.InitialContextFactory</prop>
            <prop key="java.naming.security.principal">jhon</prop>
            <prop key="java.naming.security.credentials">doe</prop>
        </props>
    </property>
</bean>

<bean id="wfcf" class="org.springframework.jndi.JndiObjectFactoryBean">
    <property name="jndiTemplate" ref="jndiTmp"/>
    <property name="jndiName" value="jms/RemoteConnectionFactory"/>
</bean>

<bean id="wildflycf" class="org.apache.camel.component.jms.JmsComponent">
    <property name="connectionFactory" ref="wfcf"/>
</bean>
...
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.apache.log4j.Logger;

@MessageDriven(name="GeneratoreMDB", activationConfig = {
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"), 
@ActivationConfigProperty(propertyName="destinationLookup", propertyValue=QueueHandler.jmsQueue)}
)
public class QueueHandler implements MessageListener {

public static final String jmsQueue = "/export/jms/generatoreQueue";

private static final Logger log = Logger.getLogger(QueueHandler.class);

  @Override
  public void onMessage(Message arg0) {
    try {
        log.info(((TextMessage) arg0).getText());
        ...
    } catch (JMSException e) {
        log.error(e);
    }
  }
}

以下是我的MDB抛出的异常:

2014-12-05 09:12:04,864 ERROR [org.hornetq.ra] (Thread-35 (HornetQ-client-global-threads-1727074612)) HQ154004: Failed to deliver message: javax.ejb.EJBTransactionRolledbackException: org.hornetq.jms.client.HornetQBytesMessage cannot be cast to javax.jms.TextMessage
at org.jboss.as.ejb3.tx.CMTTxInterceptor.handleInCallerTx(CMTTxInterceptor.java:163)
at org.jboss.as.ejb3.tx.CMTTxInterceptor.invokeInCallerTx(CMTTxInterceptor.java:253)
at org.jboss.as.ejb3.tx.CMTTxInterceptor.required(CMTTxInterceptor.java:342)
at org.jboss.as.ejb3.tx.CMTTxInterceptor.processInvocation(CMTTxInterceptor.java:239)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.as.ejb3.component.interceptors.CurrentInvocationContextInterceptor.processInvocation(CurrentInvocationContextInterceptor.java:41)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.as.ejb3.component.invocationmetrics.WaitTimeInterceptor.processInvocation(WaitTimeInterceptor.java:43)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.as.ejb3.security.SecurityContextInterceptor.processInvocation(SecurityContextInterceptor.java:95)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.as.ejb3.component.interceptors.ShutDownInterceptorFactory$1.processInvocation(ShutDownInterceptorFactory.java:64)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.as.ejb3.component.interceptors.LoggingInterceptor.processInvocation(LoggingInterceptor.java:59)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.as.ee.component.NamespaceContextInterceptor.processInvocation(NamespaceContextInterceptor.java:50)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.as.ejb3.component.interceptors.AdditionalSetupInterceptor.processInvocation(AdditionalSetupInterceptor.java:55)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.as.ejb3.component.messagedriven.MessageDrivenComponentDescription$5$1.processInvocation(MessageDrivenComponentDescription.java:211)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.invocation.ContextClassLoaderInterceptor.processInvocation(ContextClassLoaderInterceptor.java:64)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.invocation.InterceptorContext.run(InterceptorContext.java:326)
at org.wildfly.security.manager.WildFlySecurityManager.doChecked(WildFlySecurityManager.java:448)
at org.jboss.invocation.AccessCheckingInterceptor.processInvocation(AccessCheckingInterceptor.java:61)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.invocation.InterceptorContext.run(InterceptorContext.java:326)
at org.jboss.invocation.PrivilegedWithCombinerInterceptor.processInvocation(PrivilegedWithCombinerInterceptor.java:80)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.invocation.ChainedInterceptor.processInvocation(ChainedInterceptor.java:61)
at org.jboss.as.ee.component.ViewService$View.invoke(ViewService.java:185)
at org.jboss.as.ee.component.ViewDescription$1.processInvocation(ViewDescription.java:182)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.invocation.ChainedInterceptor.processInvocation(ChainedInterceptor.java:61)
at org.jboss.as.ee.component.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:73)
at edu.foo.incassionline.generatore.jms.QueueHandler$$$view3.onMessage(Unknown Source)
at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source) [:1.7.0_45]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) [rt.jar:1.7.0_45]
at java.lang.reflect.Method.invoke(Method.java:606) [rt.jar:1.7.0_45]
at org.jboss.as.ejb3.inflow.MessageEndpointInvocationHandler.doInvoke(MessageEndpointInvocationHandler.java:139)
at org.jboss.as.ejb3.inflow.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:73)
at edu.foo.incassionline.generatore.jms.QueueHandler$$$endpoint1.onMessage(Unknown Source)
at org.hornetq.ra.inflow.HornetQMessageHandler.onMessage(HornetQMessageHandler.java:319) [hornetq-ra-2.4.1.Final.jar:]
at org.hornetq.core.client.impl.ClientConsumerImpl.callOnMessage(ClientConsumerImpl.java:1116) [hornetq-core-client-2.4.1.Final.jar:]
at org.hornetq.core.client.impl.ClientConsumerImpl.access$500(ClientConsumerImpl.java:56) [hornetq-core-client-2.4.1.Final.jar:]
at org.hornetq.core.client.impl.ClientConsumerImpl$Runner.run(ClientConsumerImpl.java:1251) [hornetq-core-client-2.4.1.Final.jar:]
at org.hornetq.utils.OrderedExecutorFactory$OrderedExecutor$1.run(OrderedExecutorFactory.java:104) [hornetq-core-client-2.4.1.Final.jar:]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [rt.jar:1.7.0_45]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [rt.jar:1.7.0_45]
at java.lang.Thread.run(Thread.java:744) [rt.jar:1.7.0_45]
Caused by: java.lang.ClassCastException: org.hornetq.jms.client.HornetQBytesMessage cannot be cast to javax.jms.TextMessage
at edu.foo.incassionline.generatore.jms.QueueHandler.onMessage(QueueHandler.java:27)
at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source) [:1.7.0_45]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) [rt.jar:1.7.0_45]
at java.lang.reflect.Method.invoke(Method.java:606) [rt.jar:1.7.0_45]
at org.jboss.as.ee.component.ManagedReferenceMethodInterceptor.processInvocation(ManagedReferenceMethodInterceptor.java:52)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.invocation.WeavedInterceptor.processInvocation(WeavedInterceptor.java:53)
at org.jboss.as.ee.component.interceptors.UserInterceptorFactory$1.processInvocation(UserInterceptorFactory.java:63)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.invocation.InterceptorContext$Invocation.proceed(InterceptorContext.java:407)
at org.jboss.as.weld.ejb.Jsr299BindingsInterceptor.doMethodInterception(Jsr299BindingsInterceptor.java:82)
at org.jboss.as.weld.ejb.Jsr299BindingsInterceptor.processInvocation(Jsr299BindingsInterceptor.java:93)
at org.jboss.as.ee.component.interceptors.UserInterceptorFactory$1.processInvocation(UserInterceptorFactory.java:63)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.invocation.WeavedInterceptor.processInvocation(WeavedInterceptor.java:53)
at org.jboss.as.ee.component.interceptors.UserInterceptorFactory$1.processInvocation(UserInterceptorFactory.java:63)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.as.ejb3.component.invocationmetrics.ExecutionTimeInterceptor.processInvocation(ExecutionTimeInterceptor.java:43)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.invocation.InterceptorContext$Invocation.proceed(InterceptorContext.java:407)
at org.jboss.weld.ejb.AbstractEJBRequestScopeActivationInterceptor.aroundInvoke(AbstractEJBRequestScopeActivationInterceptor.java:55)
at org.jboss.as.weld.ejb.EjbRequestScopeActivationInterceptor.processInvocation(EjbRequestScopeActivationInterceptor.java:83)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.as.ee.concurrent.ConcurrentContextInterceptor.processInvocation(ConcurrentContextInterceptor.java:45)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.invocation.InitialInterceptor.processInvocation(InitialInterceptor.java:21)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.invocation.ChainedInterceptor.processInvocation(ChainedInterceptor.java:61)
at org.jboss.as.ee.component.interceptors.ComponentDispatcherInterceptor.processInvocation(ComponentDispatcherInterceptor.java:53)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.as.ejb3.component.pool.PooledInstanceInterceptor.processInvocation(PooledInstanceInterceptor.java:51)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.as.ejb3.tx.CMTTxInterceptor.invokeInCallerTx(CMTTxInterceptor.java:251)
... 49 more

我不明白为什么MDB以HornetQ消息而不是简单的JMS消息的形式获取消息。

共有1个答案

易琛
2023-03-14

在发送JMS消息时,Camel将消息正文转换为以下JMS消息类型(请参见此处):

╔══════════════════════╦═════════════════════════╗
║ Body Type            ║ JMS Message             ║
╠══════════════════════╬═════════════════════════╣
║ String               ║ javax.jms.TextMessage   ║
║ org.w3c.dom.Node     ║ javax.jms.TextMessage   ║
║ Map                  ║ javax.jms.MapMessage    ║
║ java.io.Serializable ║ javax.jms.ObjectMessage ║
║ byte[]               ║ javax.jms.BytesMessage  ║
║ java.io.File         ║ javax.jms.BytesMessage  ║
║ java.io.Reader       ║ javax.jms.BytesMessage  ║
║ java.io.InputStream  ║ javax.jms.BytesMessage  ║
║ java.nio.ByteBuffer  ║ javax.jms.BytesMessage  ║
╚══════════════════════╩═════════════════════════╝

请在big.numbers.processor中检查消息正文的类型。有两种可能:1)根据Camel的转换规则创建一个“文本消息类型正文”,并转换为TextMessage,或者2)转换为BytesMessage

 类似资料:
  • 我的公司目前正在研究Thrift和ActiveMQ的集成。我们希望建立一个独立于语言的服务层,该服务层运行在单个http服务器上,每个thrift服务都能够通过ActiveMQ与其他thrift服务通信。到目前为止,我还没有找到任何其他人试图实施这一点。我很好奇其他人会如何实现这一点,以及是否有这样做的文档。 当前原型使用一个简单的python服务器来托管各种备用服务。在每个服务调用(即更新设置)

  • 本文向大家介绍如何集成 Spring Boot 和 ActiveMQ?相关面试题,主要包含被问及如何集成 Spring Boot 和 ActiveMQ?时的应答技巧和注意事项,需要的朋友参考一下 对于集成 Spring Boot 和 ActiveMQ,我们使用 spring-boot-starter-activemq 依赖关系。 它只需要很少的配置,并且不需要样板代码。

  • 在我们的应用程序中,我们使用Spring与ActiveMQ集成。我们面临一个问题,例如每当ActiveMQ服务器关闭时,应用程序都会抛出: 你能建议如何让我的应用程序在ActiveMQ关闭的情况下运行吗 请在下面找到ActiveMQ配置:

  • 我目前正在尝试编写一个适配器,它将使用来自ActiveMQ的消息并将其发布到Kafka。 我正在考虑使用Spring集成来集成这两个消息传递系统。 我的问题是,我的应用程序不会维护模型的注册表,许多应用程序将使用该注册表将记录发布到activeMQ。我想接收这些javax-jms消息,并想执行一些转换,比如将jmscorrelationId添加到kafka消息中。 另外,另一个要求是仅当kafka

  • 如果发生异常,ActiveMQ broker不会相应地重新传递消息。 当我使用进行简单的集成测试时,JMS消息会被重新传递,即可以实现重试机制 如何用ActiveMQ为Tomcat实现同样的功能?

  • 本文向大家介绍springboot集成activemq的实例代码,包括了springboot集成activemq的实例代码的使用技巧和注意事项,需要的朋友参考一下 ActiveMQ ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JM