26.6 注解驱动的监听端点

优质
小牛编辑
125浏览
2023-12-01

26.6 注解驱动的监听端点

异步接收消息的最简单的方法是使用注解监听端点的基础架构。简而言之,它允许你暴露托管一个 bean 的方法作为一个 JMS 的监听端点。

@Component
public class MyService {

	@JmsListener(destination = "myDestination")
	public void processOrder(String data) { ... }
}

上述示例的想法是,每当javax.jms.Destination “myDestination” 上有消息可用时,就调用相应地processOrder方法(在这种情况下,JMS消息的内容类似于MessageListenerAdapter提供的内容)。

注解端点的基础架构使用JmsListenerContainerFactory为每个注解方法创建一个消息监听容器。这样的容器没有针对应用上下文进行注册,但是可以使用JmsListenerEndpointRegistry bean 进行简单的管理。

@JmsListener是 Java 8上的可重复注解,因此可以通过向其添加额外的@JmsListener声明将多个JMS目的地关联到同一个方法。 在 Java 6和7上,你可以使用@JmsListeners注解。

26.6.1 启用监听端点的注解

要启用对@JmsListener注解的支持,请将@EnableJms添加到你的一个@Configuration类上。

@Configuration
@EnableJms
public class AppConfig {

	@Bean
	public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
		DefaultJmsListenerContainerFactory factory =
				new DefaultJmsListenerContainerFactory();
		factory.setConnectionFactory(connectionFactory());
		factory.setDestinationResolver(destinationResolver());
		factory.setConcurrency("3-10");
		return factory;
	}
}

默认情况下,基础架构将查找名为jmsListenerContainerFactory的 bean 作为用于创建消息监听容器的工厂源。 在这种情况下,忽略 JMS 基础架构的设置,processOrder方法可以在一个线程池中被调用,线程池核心数为3,最大线程数为10。

对于使用的每个注解都可以自定义监听容器工厂,或通过实现JmsListenerConfigurer接口来显示的配置默认值。仅在存在没有指定容器工厂的端点时,默认值才是必须的。有关详细信息和示例,请参阅 javadoc。

如果您喜欢XML配置,请使用<jms:annotation-driven>元素。

<jms:annotation-driven/>

   <bean
           class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
       <property name="connectionFactory" ref="connectionFactory"/>
       <property name="destinationResolver" ref="destinationResolver"/>
       <property name="concurrency" value="3-10"/>
   </bean>

26.6.2 编程式端点注册

JmsListenerEndpoint提供了 JMS 端点的模型,并负责为该模型配置容器。除了使用注解之外,基础架构也允许你用编程的方式来配置端点。

@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {

	@Override
	public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
		SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
		endpoint.setId("myJmsEndpoint");
		endpoint.setDestination("anotherQueue");
		endpoint.setMessageListener(message -> {
			// processing
		});
		registrar.registerEndpoint(endpoint);
	}
}

本例中我们使用SimpleJmsListenerEndpoint来提供MessageListener,你也可以建立自己的端点变体并自定义调用机制。

应该注意的是,你完全可以不使用@JmsListener,而仅通过JmsListenerConfigurer来注册所有端点。

26.6.3 注解式端点方式签名

到目前为止,我们已经在我们的端点注入了一个简单的String,但实际上它可以有一个非常灵活的方法签名。现在让我们重写它并注入一个带有自定义头部的Order

@Component
public class MyService {

   	@JmsListener(destination = "myDestination")
   	public void processOrder(Order order, @Header("order_type") String orderType) {
       	...
   	}
}

可以向 JMS 监听端点中注入的主要元素包括:

  • 原始的javax.jms.Message或任意子类(当然,它与传入的消息类型相匹配)。
  • 可选的javax.jms.Session来操作 JMS 原生 API,来发送自定义回复。
  • 代表着接收消息的org.springframework.messaging.Message。注意此消息同时包含自定义和JmsHeaders定义的标准头。
  • @Header注解的方法参数被用来提取一个特定的头部值,包括标准 JMS 头。
  • @Headers注解参数必须指定给一个java.util.Map,用来获取所有头。
  • 不被支持的(如MessageSession等)、且无注解的元素被视为有效载荷。可以明确的给它们添加@Payload注解。也可以通过添加@Valid注解来开启校验。

注入 Spring 的Message抽象可以获取特定消息的所有信息,而无需依赖特定传输 API。

@JmsListener(destination = "myDestination")
public void processOrder(Message<Order> order) { ... }

可以自己扩展DefaultMessageHandlerMethodFactory来处理额外的方法参数。同时你也可以自定义转换和校验规则。

例如,如果我们需要在处理之前确保我们的Order有效,我们可以使用@Valid对有效负载进行注解,并配置必要的验证器,如下所示:

@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {

   	@Override
   	public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
       	registrar.setMessageHandlerMethodFactory(myJmsHandlerMethodFactory());
   	}

   	@Bean
   	public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
       	DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
       	factory.setValidator(myValidator());
       	return factory;
   	}
}

26.6.4 响应管理

MessageListenerAdapter允许你的方法有非空返回值。此时返回值将被封装在一个 javax.jms.Message中,发往原始消息的JMSReplyTo头中定义的目的地或者监听自己默认的目的地。监听默认的目的地可以通过@SendTo注解来设置。

假定现在我们的processOrder方法将返回一个OrderStatus,下面将展示如何自动地发送响应:

@JmsListener(destination = "myDestination")
@SendTo("status")
public OrderStatus processOrder(Order order) {
   	// order processing
   	return status;
}

如果你有多个@JmsListener注解方法,您还可以将@SendTo注解放在 class 上以共享默认响应目的地。

如果您需要以独立传输的方式设置额外的头信息,则可以返回一个Message,如下所示:

@JmsListener(destination = "myDestination")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
   	// order processing
   	return MessageBuilder
       	    .withPayload(status)
           	.setHeader("code", 1234)
           	.build();
}

如果响应的目的地是运行时实时计算的,可以将响应结果封装在一个JmsResponse中,直接指定一个目的地。前面的例子可以重写如下:

@JmsListener(destination = "myDestination")
public JmsResponse<Message<OrderStatus>> processOrder(Order order) {
   	// order processing
   	Message<OrderStatus> response = MessageBuilder
       	    .withPayload(status)
           	.setHeader("code", 1234)
           	.build();
	return JmsResponse.forQueue(response, "status");
}