Spring Ingegration提供了基于spring的EIP(Enterprise Integration Patterns,企业集成模式)的实现。Spring Integration主要解决的问题是不同系统之间(不同语言系统之间)的交互问题,通过异步消息驱动来达到系统交互时系统之间的松耦合。
Spring Integration主要有Message、Channel、Message EndPoint组成。
1.Message
Message是用来在不同部分之间传递的数据。Message有两部分组成:消息体(playload)和消息头(header)。消息体可以是任何数据类型,消息头表示的元数据就是解释消息体的内容。
/**
* A generic message representation with headers and body.
*
* @author Mark Fisher
* @author Arjen Poutsma
* @since 4.0
* @see org.springframework.messaging.support.MessageBuilder
*/
public interface Message<T> {
/**
* Return the message payload.
*/
T getPayload();
/**
* Return message headers for the message (never {@code null} but may be empty).
*/
MessageHeaders getHeaders();
}
2.Channel
MessageChannel 是Spring Integration消息通道的顶级接口:
public interface MessageChannel {
/**
* Constant for sending a message without a prescribed timeout.
*/
long INDEFINITE_TIMEOUT = -1;
/**
* Send a {@link Message} to this channel. If the message is sent successfully,
* the method returns {@code true}. If the message cannot be sent due to a
* non-fatal reason, the method returns {@code false}. The method may also
* throw a RuntimeException in case of non-recoverable errors.
* <p>This method may block indefinitely, depending on the implementation.
* To provide a maximum wait time, use {@link #send(Message, long)}.
* @param message the message to send
* @return whether or not the message was sent
*/
boolean send(Message<?> message);
/**
* Send a message, blocking until either the message is accepted or the
* specified timeout period elapses.
* @param message the message to send
* @param timeout the timeout in milliseconds or {@link #INDEFINITE_TIMEOUT}
* @return {@code true} if the message is sent, {@code false} if not
* including a timeout of an interrupt of the send
*/
boolean send(Message<?> message, long timeout);
}
MessageChannel有两大子接口,分别是PollableChannel(可轮询)和SubscribableChannel(可订阅)。我们所有的消息通道类都是实现这两大接口。
2.1 PollableChannel
PollableChannel具备轮询获得消息的能力。
public interface PollableChannel extends MessageChannel {
/**
* Receive a message from this channel, blocking indefinitely if necessary.
* @return the next available {@link Message} or {@code null} if interrupted
*/
Message<?> receive();
/**
* Receive a message from this channel, blocking until either a message is available
* or the specified timeout period elapses.
* @param timeout the timeout in milliseconds or {@link MessageChannel#INDEFINITE_TIMEOUT}.
* @return the next available {@link Message} or {@code null} if the specified timeout
* period elapses or the message reception is interrupted
*/
Message<?> receive(long timeout);
}
2.2 SubscribableChannel
SubscribableChannel发送消息给订阅了MessageHandler的订阅者。
public interface SubscribableChannel extends MessageChannel {
/**
* Register a message handler.
* @return {@code true} if the handler was subscribed or {@code false} if it
* was already subscribed.
*/
boolean subscribe(MessageHandler handler);
/**
* Un-register a message handler.
* @return {@code true} if the handler was un-registered, or {@code false}
* if was not registered.
*/
boolean unsubscribe(MessageHandler handler);
}
2.3 常用消息通道
2.3.1 PublishSubscribeChannel
PublishSubscribeChannel允许广播消息给所有的订阅者,配置方式如下:
/**
* 允许广播消息给所有订阅者,当前消息通道的id为publishSubscribeChannel
* @return
*/
@Bean
public PublishSubscribeChannel publishSubscribeChannel(){
PublishSubscribeChannel channel = new PublishSubscribeChannel();
return channel;
}
其中,当前消息通道的id为publishSubscribeChannel。
2.3.2 QueueChannel
QueueChannel允许消息接收者轮询获得消息,用一个队列(queue)接收消息,队列的容量大小可配置,配置方式如下:
@Bean
public QueueChannel queueChannel(){
QueueChannel channel = new QueueChannel(10);
return channel;
}
其中,QueueChannel构造参数10即为队列的容量。
2.3.3 PriorityChannel
PriorityChannel可按照优先级将数据存储到队列,它依据于消息的消息头priority属性,配置方式如下:
@Bean
public PriorityChannel priorityChannel(){
PriorityChannel channel = new PriorityChannel(10);
return channel;
}
2.3.4 RendezvousChannel
RendezvousChannel确保每一个接收者都接收到消息后再发送消息,配置方式如下:
@Bean
public RendezvousChannel rendezvousChannel(){
RendezvousChannel channel = new RendezvousChannel();
return channel;
}
2.3.5 DirectChannel
DirectChannel是Spring Integration默认的消息通道,它允许将消息发送给为一个订阅者,然后阻碍发送直到消息被接 收,其配置方式如下:
@Bean
public DirectChannel directChannel(){
DirectChannel channel = new DirectChannel();
return channel;
}
2.3.6 ExecutorChannel
ExecutorChannel可绑定一个多线程的task executor,配置方式如下:
@Bean
public ExecutorChannel executorChannel(){
ExecutorChannel channel = new ExecutorChannel(executor());
return channel;
}
@Bean
public Executor executor(){
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5);
taskExecutor.setMaxPoolSize(10);
taskExecutor.setQueueCapacity(25);
taskExecutor.initialize();
return taskExecutor;
}
4.通道拦截器
Spring Integration给消息通道提供了拦截器(ChannelInterceptor),用来拦截发送和接收消息的操作。
ChannelInterceptor接口定义如下,我们只需要实现这个接口即可:
public interface ChannelInterceptor {
Message<?> preSend(Message<?> message, MessageChannel channel);
void postSend(Message<?> message, MessageChannel channel, boolean sent);
void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex);
boolean preReceive(MessageChannel channel);
Message<?> postReceive(Message<?> message, MessageChannel channel);
void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex);
}
通过如下代码给所有的channel增加拦截器
channel.addInterceptor(someInterceptor);
5.Message EndPoint
消息端点(Message EndPoint)是真正处理消息的(Message)组件,它还可以控制通道的路由。我们可用的消息端点包含如下:
5.1 Channel Adapter
通道适配器(Channel Adapter)是一种连接外部系统或传输协议的端点(EndPoint),可以分为入站(inbound)和出战(outbound)。
通道适配器是单向的,入站通道适配器只支持接收消息,出站通道适配器只支持输出消息。
Spring Integration内置了如下的适配器:
5.2 Gateway
消息网关(Gateway)类似于Adapter,但是提供了双向的请求/返回集成方式,也分为入站(inbound)和出站(outbound)。
Spring Integration对响应的Adapter都提供了Gateway。
5.3 Service Activator
Service Activator可调用Spring的bean来处理消息,并将处理后的结果输出到指定的消息通道。
5.4 Router
路由(Router) 可根据消息体内容(Payload Type Router)、消息头的值(Header Value Router)以及定义好的接收表 (Recipient List Router)作为条件,来决定消息传递到的通道。
5.5 Filter
过滤器(Filter)类似于路由(router),不同的是过滤器不决定消息路由到哪里,而是决定消息是否可以传递给消息通道。
5.6 Splitter
拆分器(Splitter)将消息拆分为几个部分单独处理,拆分器处理的返回值是一个集合或者数组。
5.7 Aggregator
聚合器(Aggregator)与拆分器相反,它接收一个java.util.List作为参数,将多个消息合并为一个消息。
5.8 Enricher
当我们从外部获得消息后,需要增加额外的消息到已有的消息中,这时就需要使用消息增强器(Enricher)。消息增强器主要有 消息体增强器(Payload Enricher)和消息头增强器(Header Enricher)两种。
5.9 Transformer
转换器(Transformer)是对获得的消息进行一定的转换处理(如数据格式转换).
5.10 Bridge
使用连接桥(Bridge)可以简单的将两个消息通道连接起来。