将数据发送到AWS-SQS 然后通过Apache Camel接收SQS消息然后路由分发到不同的子系统中
一、AWS SQS简介
Amazon Simple Queue Service (SQS) 是一项快速可靠、可扩展且完全托管的消息队列服务。SQS 使得云应用程序的组件去耦合大大简化,并且具有较高的成本效益。您可以使用 SQS 来传输任何容量的数据,使用任意的吞吐量,而不会丢失消息或要求其他服务永久可用。
AWS SQS的主要作用是把生产消息的应用程序和消费消息的应用程序解耦,起到中间缓存的作用,避免直接依赖。应用场景如:消息的生产速度大于消息的消费速度;消息的生产者和消息的消费者之间的网络通信不能保证一直非常可靠。
AWS SQS的特性约束:
消息内容不超过256KB。
消息在队列中最多保留14天
不保证消息的顺序,即不保证先进先出。
Apache Camel简介
camel首先是一个规则引擎。其次才是一个开源项目。
Apache Camel是Apache基金会下的一个开源项目,它是一个基于规则路由和中介引擎,提供企业集成模式的Java对象的实现,通过应用程序接口(或称为陈述式的Java领域特定语言(DSL))来配置路由和中介的规则。领域特定语言意味着Apache Camel支持你在的集成开发工具中使用平常的,类型安全的,可自动补全的Java代码来编写路由规则,而不需要大量的XML配置文件。同时,也支持在Spring中使用XML配置定义路由和中介规则。
Camel提供的基于规则的路由(Routing)引擎,可以使用Camel定义的DSL语言,方便的定义出各种Routing。
如下例:
from(“file://xxxx").to("activemq://xxxx") 将某文件,读入并写入到ActiveMQ的JMS中
1、先在pom中引入SQS的依赖jar包和 camel-aws和camel-spring 配置
<camel.version>2.18.0</camel.version>
<aws-java-sdk.version>1.11.18</aws-java-sdk.version>
<jackson.version>2.8.4</jackson.version>
<!-- camel -->
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-stream</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-aws</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-spring</artifactId>
<version>${camel.version}</version>
</dependency>
<!-- AWS SQS -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sqs</artifactId>
<version>${aws-java-sdk.version}</version>
</dependency>
<!-- AWS SQS end -->
<!-- JackSon -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- JackSon end -->
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:camel="http://camel.apache.org/schema/spring" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">
<bean id="myBuilder" class="com.test.server.camel.MyBuilder"/>
<bean id="credentials" class="com.amazonaws.auth.BasicAWSCredentials">
<!-- AWS 秘钥 -->
<constructor-arg index="0" value="xxxxxx" />
<constructor-arg index="1" value="xxxxxx" />
</bean>
<!-- AWS SQS 客户端 -->
<bean id="client" class="com.amazonaws.services.sqs.AmazonSQSClient">
<constructor-arg index="0" ref="credentials" />
</bean>
<!-- SQS队列消费者实例 -->
<bean id="myconsumer" class="com.test.server.consumer.Myconsumer" />
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<camel:routeBuilder ref="myBuilder"/>
</camelContext>
</beans>
3、
MyBuilder接口实现类
package com.test.server.camel;
import org.apache.camel.builder.RouteBuilder;
public class MyBuilder extends RouteBuilder {
@Override
public void configure() throws Exception {
from("aws-sqs://MyQueue?amazonSQSClient=#client&maxMessagesPerPoll=5")
.to("bean:myconsumer?method=receiveMessage")
.end();
}
}
package com.test.server.consumer;
public class Myconsumer {
public void receiveMessage(String message) {
System.out.pritln("receiveMsg=" + message);
// TODO
}
}