Spring Integration 系统集成

扈沛
2023-12-01

          Spring Ingegration提供了基于spring的EIP(Enterprise Integration Patterns,企业集成模式)的实现。Spring Integration主要解决的问题是不同系统之间(不同语言系统之间)的交互问题,通过异步消息驱动来达到系统交互时系统之间的松耦合。

       Spring Integration主要有Message、Channel、Message EndPoint组成。

    

1.Message

    Message是用来在不同部分之间传递的数据。Message有两部分组成:消息体(playload)和消息头(header)。消息体可以是任何数据类型,消息头表示的元数据就是解释消息体的内容。

   /**
    * A generic message representation with headers and body.
    *
    * @author Mark Fisher
    * @author Arjen Poutsma
    * @since 4.0
    * @see org.springframework.messaging.support.MessageBuilder
    */
public interface Message<T> {


    /**
     * Return the message payload.
     */
    T getPayload();


    /**
     * Return message headers for the message (never {@code null} but may be empty).
     */
    MessageHeaders getHeaders();
}


2.Channel
  MessageChannel 是Spring Integration消息通道的顶级接口:


  public interface MessageChannel {

    /**
     * Constant for sending a message without a prescribed timeout.
     */
    long INDEFINITE_TIMEOUT = -1;


    /**
     * Send a {@link Message} to this channel. If the message is sent successfully,
     * the method returns {@code true}. If the message cannot be sent due to a
     * non-fatal reason, the method returns {@code false}. The method may also
     * throw a RuntimeException in case of non-recoverable errors.
     * <p>This method may block indefinitely, depending on the implementation.
     * To provide a maximum wait time, use {@link #send(Message, long)}.
     * @param message the message to send
     * @return whether or not the message was sent
     */
    boolean send(Message<?> message);


    /**
     * Send a message, blocking until either the message is accepted or the
     * specified timeout period elapses.
     * @param message the message to send
     * @param timeout the timeout in milliseconds or {@link #INDEFINITE_TIMEOUT}
     * @return {@code true} if the message is sent, {@code false} if not
     * including a timeout of an interrupt of the send
     */
    boolean send(Message<?> message, long timeout);
}
 

MessageChannel有两大子接口,分别是PollableChannel(可轮询)和SubscribableChannel(可订阅)。我们所有的消息通道类都是实现这两大接口。


2.1 PollableChannel

      PollableChannel具备轮询获得消息的能力。


      public interface PollableChannel extends MessageChannel {


    /**
     * Receive a message from this channel, blocking indefinitely if necessary.
     * @return the next available {@link Message} or {@code null} if interrupted
     */
    Message<?> receive();


    /**
     * Receive a message from this channel, blocking until either a message is available
     * or the specified timeout period elapses.
     * @param timeout the timeout in milliseconds or {@link MessageChannel#INDEFINITE_TIMEOUT}.
     * @return the next available {@link Message} or {@code null} if the specified timeout
     * period elapses or the message reception is interrupted
     */
    Message<?> receive(long timeout);
}


2.2 SubscribableChannel

      SubscribableChannel发送消息给订阅了MessageHandler的订阅者。


     public interface SubscribableChannel extends MessageChannel {


    /**
     * Register a message handler.
     * @return {@code true} if the handler was subscribed or {@code false} if it
     * was already subscribed.
     */
    boolean subscribe(MessageHandler handler);


    /**
     * Un-register a message handler.
     * @return {@code true} if the handler was un-registered, or {@code false}
     * if was not registered.
     */
    boolean unsubscribe(MessageHandler handler);
}


2.3 常用消息通道

        2.3.1 PublishSubscribeChannel

               PublishSubscribeChannel允许广播消息给所有的订阅者,配置方式如下:

               /**
                * 允许广播消息给所有订阅者,当前消息通道的id为publishSubscribeChannel
                * @return
                */
              @Bean
               public PublishSubscribeChannel publishSubscribeChannel(){
                      PublishSubscribeChannel channel = new PublishSubscribeChannel();
                      return channel;
               }

               其中,当前消息通道的id为publishSubscribeChannel。


     2.3.2 QueueChannel

              QueueChannel允许消息接收者轮询获得消息,用一个队列(queue)接收消息,队列的容量大小可配置,配置方式如下:

               @Bean
               public QueueChannel queueChannel(){
                     QueueChannel channel = new QueueChannel(10);
                     return channel;

               }

               其中,QueueChannel构造参数10即为队列的容量。


     2.3.3 PriorityChannel

             PriorityChannel可按照优先级将数据存储到队列,它依据于消息的消息头priority属性,配置方式如下:

             @Bean
             public PriorityChannel priorityChannel(){
                    PriorityChannel channel = new PriorityChannel(10);
                    return channel;
             }


      2.3.4 RendezvousChannel

              RendezvousChannel确保每一个接收者都接收到消息后再发送消息,配置方式如下:

              @Bean
              public RendezvousChannel rendezvousChannel(){
                    RendezvousChannel channel = new RendezvousChannel();
                    return channel;
               }


       2.3.5 DirectChannel

                DirectChannel是Spring Integration默认的消息通道,它允许将消息发送给为一个订阅者,然后阻碍发送直到消息被接                  收,其配置方式如下:

                @Bean
    public DirectChannel directChannel(){
        DirectChannel channel = new DirectChannel();
        return channel;
    }


       2.3.6 ExecutorChannel

                ExecutorChannel可绑定一个多线程的task executor,配置方式如下:

                @Bean
    public ExecutorChannel executorChannel(){
        ExecutorChannel channel = new ExecutorChannel(executor());
        return channel;
    }


    @Bean
    public Executor executor(){
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(5);
        taskExecutor.setMaxPoolSize(10);
        taskExecutor.setQueueCapacity(25);
        taskExecutor.initialize();
        return taskExecutor;
    }


4.通道拦截器

      Spring Integration给消息通道提供了拦截器(ChannelInterceptor),用来拦截发送和接收消息的操作。

      ChannelInterceptor接口定义如下,我们只需要实现这个接口即可:

      

        public interface ChannelInterceptor {

        Message<?> preSend(Message<?> message, MessageChannel channel);

        void postSend(Message<?> message, MessageChannel channel, boolean sent);

        void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex);

        boolean preReceive(MessageChannel channel);

        Message<?> postReceive(Message<?> message, MessageChannel channel);

        void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex);

    }

         通过如下代码给所有的channel增加拦截器

         channel.addInterceptor(someInterceptor);


5.Message EndPoint

   消息端点(Message EndPoint)是真正处理消息的(Message)组件,它还可以控制通道的路由。我们可用的消息端点包含如下:

   5.1 Channel Adapter

         通道适配器(Channel Adapter)是一种连接外部系统或传输协议的端点(EndPoint),可以分为入站(inbound)和出战(outbound)。

         通道适配器是单向的,入站通道适配器只支持接收消息,出站通道适配器只支持输出消息。

         Spring Integration内置了如下的适配器:

RabbitMQ、Feed、File、FTP/SFTP、Gemfire、HTTP、TCP/UDP、JDBC、JPA、JMS、Mail、MongoDB、Redis、RMI
Twitter、XMPP、WebServices(SOAP、REST)、WebSocket

   5.2 Gateway

          消息网关(Gateway)类似于Adapter,但是提供了双向的请求/返回集成方式,也分为入站(inbound)和出站(outbound)。

          Spring Integration对响应的Adapter都提供了Gateway。


   5.3 Service Activator

          Service Activator可调用Spring的bean来处理消息,并将处理后的结果输出到指定的消息通道。


   5.4 Router

             路由(Router) 可根据消息体内容(Payload Type Router)、消息头的值(Header Value Router)以及定义好的接收表 (Recipient List Router)作为条件,来决定消息传递到的通道。


     5.5 Filter

         过滤器(Filter)类似于路由(router),不同的是过滤器不决定消息路由到哪里,而是决定消息是否可以传递给消息通道。


    5.6 Splitter

          拆分器(Splitter)将消息拆分为几个部分单独处理,拆分器处理的返回值是一个集合或者数组。


    5.7 Aggregator

          聚合器(Aggregator)与拆分器相反,它接收一个java.util.List作为参数,将多个消息合并为一个消息。


    5.8 Enricher

          当我们从外部获得消息后,需要增加额外的消息到已有的消息中,这时就需要使用消息增强器(Enricher)。消息增强器主要有 消息体增强器(Payload Enricher)和消息头增强器(Header Enricher)两种。


     5.9 Transformer

           转换器(Transformer)是对获得的消息进行一定的转换处理(如数据格式转换).


     5.10 Bridge

             使用连接桥(Bridge)可以简单的将两个消息通道连接起来。



 类似资料: