如何在 camel中 配置使用 activemq
how to configure and use activemq in camel
一、configure activemq, create a xml file "activemq_connect.xml" ,contents as :
<?xml version="1.0"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:camel="http://camel.apache.org/schema/spring"
xmlns:osgi="http://www.springframework.org/schema/osgi"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd
http://www.springframework.org/schema/osgi
http://www.springframework.org/schema/osgi/spring-osgi.xsd
http://www.springframework.org/schema/osgi-compendium
http://www.springframework.org/schema/osgi-compendium/spring-osgi-compendium.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd
http://www.springframework.org/schema/osgi
http://www.springframework.org/schema/osgi/spring-osgi.xsd">
<!--
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
-->
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
<bean id="pooledConnectionFactory2" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="maxConnections" value="800" />
<property name="maximumActive" value="500" />
<property name="connectionFactory" ref="jmsConnectionFactory" />
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="pooledConnectionFactory2"/>
<property name="transacted" value="true"/>
<property name="concurrentConsumers" value="100"/>
</bean>
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="configuration" ref="jmsConfig"/>
</bean>
</beans>
二、copy "activemq_connect.xml" to apache-servicemix-4.3.0-fuse-03-00's "deploy" directory,然后启动servicemix 。
注意:apache-servicemix-4.2.0不行
三、配置osgi jar包中的META-INF/spring/camel-bean.xml文件:
<?xml version="1.0"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:camel="http://camel.apache.org/schema/spring"
xmlns:osgi="http://www.springframework.org/schema/osgi"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd
http://www.springframework.org/schema/osgi
http://www.springframework.org/schema/osgi/spring-osgi.xsd
http://www.springframework.org/schema/osgi-compendium
http://www.springframework.org/schema/osgi-compendium/spring-osgi-compendium.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd
http://www.springframework.org/schema/osgi
http://www.springframework.org/schema/osgi/spring-osgi.xsd">
<!--
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
-->
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
<bean id="pooledConnectionFactory2" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="maxConnections" value="8" />
<property name="maximumActive" value="500" />
<property name="connectionFactory" ref="jmsConnectionFactory" />
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="pooledConnectionFactory2"/>
<property name="transacted" value="true"/>
<property name="concurrentConsumers" value="10"/>
</bean>
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="configuration" ref="jmsConfig"/>
</bean>
<!--
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="timer://MyTimer?fixedRate=true&period=4000"/>
<setBody><constant>Hello World!</constant></setBody>
<to uri="activemq:camel.timer"/>
</route>
<route>
<from uri="activemq:camel.timer"/>
<to uri="file://timer"/>
</route>
</camelContext>
-->
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<package>com.mycode.MyPackage</package>
</camelContext>
<!-- lets configure the ActiveMQ JMS broker server to listen on TCP 51616
<broker:broker id="broker" useJmx="false" persistent="false" brokerName="localhost">
<broker:transportConnectors>
<broker:transportConnector name="tcp" uri="tcp://localhost:51616"/>
</broker:transportConnectors>
</broker:broker>
-->
<!-- lets configure the Camel ActiveMQ to use the ActiveMQ broker declared above
<bean id="test-jms" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="brokerURL" value="tcp://localhost:51616"/>
</bean>
-->
</beans>
四、 在com.mycode.MyPackage中建立routerBuilder子类,并运用activemq:
package com.mycode.MyPackage;
import javax.jms.Message;
import javax.jms.MessageListener;
import static org.apache.camel.builder.xml.XPathBuilder.xpath;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.processor.interceptor.*;
import org.apache.camel.builder.xml.*;
import org.apache.camel.component.file.*;
public class MyRouteBuilder extends RouteBuilder {//extends Routerbase {
private Tracer tracer = new Tracer();
private MyTraceProcess myTraceProcess = new MyTraceProcess();
MyTransform transform = new MyTransform();
//MyLog myLog = new MyLog();
Log myLog = new Log();
@Override
public void configure() throws Exception {
/
Tracer tracer = new Tracer();
tracer.setDestinationUri("direct:traced");
tracer.setLogLevel(LoggingLevel.INFO);
tracer.start();
tracer.setTraceOutExchanges(true);
DefaultTraceFormatter formatter = new DefaultTraceFormatter();
formatter.setShowOutBody(true);
formatter.setShowOutBodyType(true);
tracer.setFormatter(formatter);
getContext().addInterceptStrategy(tracer);
/**/
//
onException(EDINotFoundException.class).stop();
onException(EDI2SimpleXMLException.class)//.maximumRedeliveries(1).handled(true)
.bean(myLog, "logEdi2SimpleXmlException")
.bean(transform, "setBodyByInEdiError")///setBodyByInEdiError
.to("file:camel-out-inEdiError9")
.bean(transform, "delHandledFile")
.stop();
///
onException(SimpleXML2PlatformXMLException.class)
.bean(myLog,"logSimpleXml2PlatformXmlException")
.bean(transform, "setBodyByOriginBodyError")//setBodyBySimpleXmlError
.to("file:camel-out-simpleXmlError9")
.bean(transform, "delHandledFile")
.stop();
onException(PlatformXML2SimpleXMLException.class)
.bean(myLog,"logPlatformXml2SimpleXmlException")
.bean(transform, "setBodyByOriginBodyError")//setBodyByPlatformXml2Error
.to("file:camel-out-Error9")
.bean(transform, "delHandledFile")
.stop();
onException(SimpleXML2EDIException.class)
.bean(myLog,"logSimpleXml2EdiException")
.bean(transform, "setBodyByOriginBodyError")//setBodyBySimpleXml2Error
.to("file:camel-out-simpleXml2Error9")
.bean(transform, "delHandledFile")
.stop();
/**/
from("direct:traced").process(myTraceProcess);
///
Consumer consumeObj=new Consumer();
from("file:camel-in-edi9?include=.*//.txt&delay=300&delete=true")//from("file:camel-in-edi/A?include=.*//.txt&delay=300&delete=true")
.bean(myLog, "log")
.to("file:camel-out")
.bean(transform, "setBodyBySimpleXml")
.to("file:camel-out-simpleXmlAll9")
.bean(transform, "SimpleXml2PlatformXml") //.bean(transform, "setBodyByPlatformXml")
.bean(myLog, "logSimpleXml2PlatformXml")
.to("file:camel-out-Powere2eXmlAll9")
/*
.bean(consumeObj, "setContent")
.to("activemq:camel.timer");
//platformXml2edi//
from("activemq:camel.timer")//to do: needed to convert exchang type from jms type to string type
.bean(consumeObj, "consume")
.convertBodyTo(String.class)
*/
.choice()
.when(xpath("/Pip3A4PurchaseOrderRequest/toRole/PartnerRoleDescription/PartnerDescription/BusinessDescription/M2MMemberId = '8718'"))
.bean(transform, "platformXml2SimpleXml")
.bean(myLog,"logPlatformXml2SimpleXml")
.bean(transform, "simpleXml2Edi")
.bean(myLog,"logSimpleXml2Edi")
.to("file:camel-out-png-edi9")
.stop()
.otherwise().to("file:camel-out-notKnownToWhere")
//.to("log:ExampleRouter")
//.bean(transform, "delHandledFile")
.stop()
.end();
}
/*
public static class Consumer {
public void setContent(Exchange message) {
PlatformMessage platformmessage = new PlatformMessage(message.getIn().getBody().toString());
platformmessage.setLog((LogBean)message.getProperty("logBean"));
message.getIn().setBody(platformmessage);
message.getIn().setHeader("logId", message.getProperty("logId"));
//message.getOut().setHeader("logBean", message.getProperty("logBean"));
message.getIn().setHeader("CamelFileNameOnly", message.getIn().getHeader("CamelFileNameOnly"));
//System.out.println("consume setContent = " + message);
//System.out.println("consume setContent out = " + message.getOut());
}
//public void consume(@Body String message) {
public void consume(Exchange message) {
message.setProperty("logId", message.getIn().getHeader("logId"));
System.out.println("message.getIn().getBody()(inside class Consumer consume() ): " + message.getIn().getBody());
message.setProperty("logBean", ((PlatformMessage)message.getIn().getBody()).getLog());
message.getIn().setBody(((PlatformMessage)message.getIn().getBody()).getXml());
//System.out.println("consume message = " + message);
}
}//end class Consumer
*/
}