我想知道,在Spring AMQP中,是否可以根据负载类型在多个类中接收来自同一队列的消息。
我知道在类中使用@RabbitListener注释,然后将@RabbitHandler放在方法上,但我希望在保持单个队列的同时将消息处理的复杂性拆分为多个类。
当前使用的版本:Spring AMQP v2.0.3以及RabbitMQ。
或者,我发现,如果您有足够高版本的AQMP和Rabbit(mine=spring-rabit 2.1.3, spring-starter-aqmp 2.1.2),您可以执行以下多态:
@RabbitListener(id="multi", queues = "somequeuename")
public class SomeService
{
@RabbitHandler
public void handleADTO(@Payload ADTO adto) {
System.out.print(adto.url);
}
@RabbitHandler
public void handleADTO2(@Payload ADTO2 adto) {
System.out.print(adto.url);
}
}
但是,开箱即用,这不起作用。您需要配置另一个“袋豆”:
@Bean
public SimpleRabbitListenerContainerFactory myFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory =
new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
@Bean
Jackson2JsonMessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
RabbitTemplate rabbitTemplate(Jackson2JsonMessageConverter converter, ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
只需将其放入你的应用程序或其他类中。
另请参见
https://docs.spring.io/spring-amqp/docs/current/reference/html/
是的,您可以,但它采用了一种稍微不同的方法:您需要监听通用消息类型,进行一些切换,并进行自己的反序列化。当然,您可以将代码完全隐藏在某个地方(基类、注释…)
下面的例子可以扩展到任何其他类型。上面的示例是对A和B DTO类型进行过滤。
void receive(ADTO dto)
{
System.out.print(dto.url);
}
void receive(BDTO dto)
{
System.out.print(dto.url);
}
@RabbitListener(queues = "your.queue.name")
public void listenMesssage(Message message)
{
try
{
String typeId = message.getMessageProperties().getHeaders().get("__TypeId__").toString();
String contentType = message.getMessageProperties().getContentType();
if (contentType != "application/json" || typeId == null || !typeId.contains(ADTO.class.toString()))
{
//TODO log warning
System.out.print("type not supported by this service");
return;
}
Object receivedObject = new Jackson2JsonMessageConverter().fromMessage(message);
if (receivedObject instanceof ADTO)
{
receive((ADTO)receivedObject);
System.out.print("ADTO");
}
//else
或者,您也可以这样进行序列化:
....
String typeId = message.getMessageProperties().getHeaders().get("__TypeId__").toString();
byte[] binMsg = message.getBody();
String strMsg = new String(binMsg, StandardCharsets.UTF_8);
ObjectMapper mapper = new ObjectMapper();
if (typeId.contains("ADTO"))
{
receive(mapper.readValue(strMsg, ADTO.class ));
}
else
...
嗯,这是不可能的。那么你想要的方式就不会是排队了。设计一个监听器并根据负载类型分配给它的方法,这实际上是一个架构决策。
作为一种解决方法,我可以建议您将单个@RabbitListener
类中的逻辑委托给这些业务服务:
@RabbitListener(queues = "foo")
public class MyListener {
private final ServiceA serviceA;
private final ServiceB serviceB;
public MyListener(ServiceA serviceA, ServiceB serviceB) {
this.serviceA = serviceA;
this.serviceB = serviceB;
}
@RabbitHandler
public void handleA(A a) {
this.serviceA.handle(a);
}
@RabbitHandler
public void handleB(B b) {
this.serviceB.handle(b);
}
}
是否可以使用topic将消息发送到队列,并有2个消费者接收和处理相同的消息?目前,我已经创建了两个消费者,他们正在观察与一个exchage主题绑定的队列,但是第一个消费者使用了该消息并删除了该队列,第二个消费者没有接收到该消息。
我刚刚开始使用RabbitMQ和AMQP。 我有一个消息队列 我有多个消费者,我想用相同的消息做不同的事情。 RabbitMQ的大部分文档似乎都集中在循环(round-robin)上,即单个消息由单个消费者使用,负载在每个消费者之间分散。这的确是我目击的行为。 例如:生产者只有一个队列,每2秒发送一次消息: 这里有一个消费者: 如果我启动消费者两次,我可以看到每个消费者都在以循环行为消费交替消息。
问题内容: 我一般只是开始使用RabbitMQ和AMQP。 我有一条消息队列 我有多个消费者,我想用 同一条消息 做不同的事情。 RabbitMQ的大多数文档似乎都集中在循环上,即单个消息由单个使用者使用,而负载则分散在每个使用者之间。我确实是这种行为。 例如:生产者只有一个队列,每2秒发送一次消息: 这是一个消费者: 如果我启动使用者两次,则 可以看到每个使用者都以循环方式使用替代消息。 例如,
我最近开始学习Spring和spring-amqp,所以这个问题可能看起来很基本,所以请原谅我。 null 或者有一种方法可以Spring加载所有我的队列配置类,然后只使用如下所示的对象: 那么,如何在不执行每次的情况下获取确切队列的amqpTemplate呢? 每次请求到达我的服务时都做新的AnnotationConfigApplicationContext有什么害处?[我想为每个请求创建一个新
问题内容: 我有这个SQL查询: 列“前缀”是字母数字值(0-9a-zA-z)。我想要做的是,如果prefix的值是一个数字,则使该数字等于0。如果它是一个字母,它将保持其值。我试图在group by子句下添加以下行: 但是我收到一个错误“将varchar值’J’转换为数据类型int时转换失败。”。 是什么导致此错误?如何获得“前缀”列以显示0或字母? 问题答案: 您显然也需要此作为表达式:
到目前为止,对于RabbitMQ中的一个队列,我使用了一个通道,但现在我动态创建了多个队列,所以我必须为每个队列创建一个新通道,还是一个通道可以从不同的队列接收/发送消息?