how to configure and use activemq in camel

艾英范
2023-12-01

如何在 camel中 配置使用 activemq

how to configure and use activemq in camel

 

一、configure activemq, create a xml file "activemq_connect.xml" ,contents as :

<?xml version="1.0"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:util="http://www.springframework.org/schema/util"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:camel="http://camel.apache.org/schema/spring"
  xmlns:osgi="http://www.springframework.org/schema/osgi"
  xsi:schemaLocation="
         http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans.xsd
         http://www.springframework.org/schema/util
         http://www.springframework.org/schema/util/spring-util.xsd
         http://www.springframework.org/schema/osgi
         http://www.springframework.org/schema/osgi/spring-osgi.xsd
         http://www.springframework.org/schema/osgi-compendium
         http://www.springframework.org/schema/osgi-compendium/spring-osgi-compendium.xsd
         http://camel.apache.org/schema/spring
         http://camel.apache.org/schema/spring/camel-spring.xsd
         http://www.springframework.org/schema/osgi
         http://www.springframework.org/schema/osgi/spring-osgi.xsd">

  <!--
  <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="brokerURL" value="tcp://localhost:61616"/>
  </bean>
  -->
    <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616" />
    </bean>
    <bean id="pooledConnectionFactory2" class="org.apache.activemq.pool.PooledConnectionFactory">
        <property name="maxConnections" value="800" />
        <property name="maximumActive" value="500" />
        <property name="connectionFactory" ref="jmsConnectionFactory" />
    </bean>
    <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
        <property name="connectionFactory" ref="pooledConnectionFactory2"/>
        <property name="transacted" value="true"/>
        <property name="concurrentConsumers" value="100"/>
    </bean>
    <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
        <property name="configuration" ref="jmsConfig"/>
    </bean>
   
   
</beans>

 

二、copy "activemq_connect.xml" to apache-servicemix-4.3.0-fuse-03-00's  "deploy" directory,然后启动servicemix 。

注意:apache-servicemix-4.2.0不行

 

三、配置osgi jar包中的META-INF/spring/camel-bean.xml文件:

<?xml version="1.0"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:util="http://www.springframework.org/schema/util"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:camel="http://camel.apache.org/schema/spring"
  xmlns:osgi="http://www.springframework.org/schema/osgi"
  xsi:schemaLocation="
         http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans.xsd
         http://www.springframework.org/schema/util
         http://www.springframework.org/schema/util/spring-util.xsd
         http://www.springframework.org/schema/osgi
         http://www.springframework.org/schema/osgi/spring-osgi.xsd
         http://www.springframework.org/schema/osgi-compendium
         http://www.springframework.org/schema/osgi-compendium/spring-osgi-compendium.xsd
         http://camel.apache.org/schema/spring
         http://camel.apache.org/schema/spring/camel-spring.xsd
         http://www.springframework.org/schema/osgi
         http://www.springframework.org/schema/osgi/spring-osgi.xsd">

  <!--
  <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="brokerURL" value="tcp://localhost:61616"/>
  </bean>
  -->
    <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616" />
    </bean>
    <bean id="pooledConnectionFactory2" class="org.apache.activemq.pool.PooledConnectionFactory">
        <property name="maxConnections" value="8" />
        <property name="maximumActive" value="500" />
        <property name="connectionFactory" ref="jmsConnectionFactory" />
    </bean>
    <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
        <property name="connectionFactory" ref="pooledConnectionFactory2"/>
        <property name="transacted" value="true"/>
        <property name="concurrentConsumers" value="10"/>
    </bean>
    <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
        <property name="configuration" ref="jmsConfig"/>
    </bean>
 
  <!--
  <camelContext xmlns="http://camel.apache.org/schema/spring">
    <route>
      <from uri="timer://MyTimer?fixedRate=true&amp;period=4000"/>
      <setBody><constant>Hello World!</constant></setBody>
      <to uri="activemq:camel.timer"/>
    </route>
    <route>
      <from uri="activemq:camel.timer"/>
      <to uri="file://timer"/>
    </route>
  </camelContext>
  -->
    <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
        <package>com.mycode.MyPackage</package>  
    </camelContext>
  
    <!-- lets configure the ActiveMQ JMS broker server to listen on TCP 51616    
    <broker:broker id="broker" useJmx="false" persistent="false" brokerName="localhost">
        <broker:transportConnectors>
            <broker:transportConnector name="tcp" uri="tcp://localhost:51616"/>
        </broker:transportConnectors>
    </broker:broker>
    -->
    <!-- lets configure the Camel ActiveMQ to use the ActiveMQ broker declared above
    <bean id="test-jms" class="org.apache.activemq.camel.component.ActiveMQComponent">
        <property name="brokerURL" value="tcp://localhost:51616"/>
    </bean>
     -->      
   
</beans>

 

四、 在com.mycode.MyPackage中建立routerBuilder子类,并运用activemq:

package com.mycode.MyPackage;

import javax.jms.Message;
import javax.jms.MessageListener;

import static org.apache.camel.builder.xml.XPathBuilder.xpath;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.processor.interceptor.*;
import org.apache.camel.builder.xml.*;
import org.apache.camel.component.file.*;


public class MyRouteBuilder extends RouteBuilder {//extends Routerbase {
    private Tracer tracer = new Tracer();
    private MyTraceProcess myTraceProcess = new MyTraceProcess();
   
    MyTransform transform = new MyTransform();
    //MyLog myLog = new MyLog();
    Log myLog = new Log();
   
    @Override
    public void configure() throws Exception {
       
        /       
        Tracer tracer = new Tracer();
        tracer.setDestinationUri("direct:traced");
        tracer.setLogLevel(LoggingLevel.INFO);
        tracer.start();
        tracer.setTraceOutExchanges(true);
        DefaultTraceFormatter formatter = new DefaultTraceFormatter();
        formatter.setShowOutBody(true);
        formatter.setShowOutBodyType(true);
        tracer.setFormatter(formatter);
        getContext().addInterceptStrategy(tracer);
        /**/
        //
        onException(EDINotFoundException.class).stop();
        onException(EDI2SimpleXMLException.class)//.maximumRedeliveries(1).handled(true)
            .bean(myLog, "logEdi2SimpleXmlException")
            .bean(transform, "setBodyByInEdiError")///setBodyByInEdiError
            .to("file:camel-out-inEdiError9")
            .bean(transform, "delHandledFile")
            .stop();
        ///
        onException(SimpleXML2PlatformXMLException.class)
            .bean(myLog,"logSimpleXml2PlatformXmlException")
            .bean(transform, "setBodyByOriginBodyError")//setBodyBySimpleXmlError
            .to("file:camel-out-simpleXmlError9")
            .bean(transform, "delHandledFile")
            .stop();       
        onException(PlatformXML2SimpleXMLException.class)
            .bean(myLog,"logPlatformXml2SimpleXmlException")
            .bean(transform, "setBodyByOriginBodyError")//setBodyByPlatformXml2Error
            .to("file:camel-out-Error9")
            .bean(transform, "delHandledFile")
            .stop();       
        onException(SimpleXML2EDIException.class)
            .bean(myLog,"logSimpleXml2EdiException")
            .bean(transform, "setBodyByOriginBodyError")//setBodyBySimpleXml2Error
            .to("file:camel-out-simpleXml2Error9")
            .bean(transform, "delHandledFile")
            .stop();
        /**/
       
        from("direct:traced").process(myTraceProcess);      
        ///
        Consumer consumeObj=new Consumer();       
        from("file:camel-in-edi9?include=.*//.txt&delay=300&delete=true")//from("file:camel-in-edi/A?include=.*//.txt&delay=300&delete=true")


            .bean(myLog, "log")       
            .to("file:camel-out")
            .bean(transform, "setBodyBySimpleXml") 
            .to("file:camel-out-simpleXmlAll9")
            .bean(transform, "SimpleXml2PlatformXml")  //.bean(transform, "setBodyByPlatformXml")           
            .bean(myLog, "logSimpleXml2PlatformXml")
            .to("file:camel-out-Powere2eXmlAll9")
            /*
            .bean(consumeObj, "setContent")
            .to("activemq:camel.timer");
           
            //platformXml2edi//
        from("activemq:camel.timer")//to do: needed to convert exchang type from jms type to string type
          .bean(consumeObj, "consume")
          .convertBodyTo(String.class) 
          */
          .choice()
          .when(xpath("/Pip3A4PurchaseOrderRequest/toRole/PartnerRoleDescription/PartnerDescription/BusinessDescription/M2MMemberId = '8718'"))
            .bean(transform, "platformXml2SimpleXml")
            .bean(myLog,"logPlatformXml2SimpleXml")
            .bean(transform, "simpleXml2Edi")
            .bean(myLog,"logSimpleXml2Edi")
            .to("file:camel-out-png-edi9")
            .stop()
          .otherwise().to("file:camel-out-notKnownToWhere")
            //.to("log:ExampleRouter")
            //.bean(transform, "delHandledFile")
            .stop()
          .end();     
    }
    /*
    public static class Consumer {
        public void setContent(Exchange message) {
            PlatformMessage platformmessage = new PlatformMessage(message.getIn().getBody().toString());
            platformmessage.setLog((LogBean)message.getProperty("logBean"));
            message.getIn().setBody(platformmessage);
            message.getIn().setHeader("logId", message.getProperty("logId"));
            //message.getOut().setHeader("logBean", message.getProperty("logBean"));
            message.getIn().setHeader("CamelFileNameOnly", message.getIn().getHeader("CamelFileNameOnly"));
            //System.out.println("consume setContent = " + message);
            //System.out.println("consume setContent out = " + message.getOut());
        }
         //public void consume(@Body String message) {
         public void consume(Exchange message) {
             message.setProperty("logId", message.getIn().getHeader("logId"));
             System.out.println("message.getIn().getBody()(inside class Consumer consume() ): " + message.getIn().getBody());
             message.setProperty("logBean", ((PlatformMessage)message.getIn().getBody()).getLog());
             message.getIn().setBody(((PlatformMessage)message.getIn().getBody()).getXml());
             //System.out.println("consume message = " + message);
        }
    }//end class Consumer
    */

}

 

 

 

 

 

 

 

 

 

 

 

 

 类似资料:

相关阅读

相关文章

相关问答