当前位置: 首页 > 知识库问答 >
问题:

@多个类中同一队列的RabbitListener

赫连宏伯
2023-03-14

我想知道,在Spring AMQP中,是否可以根据负载类型在多个类中接收来自同一队列的消息。

我知道在类中使用@RabbitListener注释,然后将@RabbitHandler放在方法上,但我希望在保持单个队列的同时将消息处理的复杂性拆分为多个类。

当前使用的版本:Spring AMQP v2.0.3以及RabbitMQ。

共有3个答案

钱季
2023-03-14

或者,我发现,如果您有足够高版本的AQMP和Rabbit(mine=spring-rabit 2.1.3, spring-starter-aqmp 2.1.2),您可以执行以下多态:

@RabbitListener(id="multi", queues = "somequeuename")
public class SomeService 
{
    @RabbitHandler
    public void handleADTO(@Payload ADTO adto) {
         System.out.print(adto.url);
    }

    @RabbitHandler
    public void handleADTO2(@Payload ADTO2 adto) {
         System.out.print(adto.url);
    }
}

但是,开箱即用,这不起作用。您需要配置另一个“袋豆”:

@Bean
public SimpleRabbitListenerContainerFactory myFactory(
        SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory =
            new SimpleRabbitListenerContainerFactory();
    configurer.configure(factory, connectionFactory);
    return factory;
}

@Bean
Jackson2JsonMessageConverter jsonMessageConverter() {
    return new Jackson2JsonMessageConverter();
}

@Bean
RabbitTemplate rabbitTemplate(Jackson2JsonMessageConverter converter, ConnectionFactory connectionFactory) {
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    template.setMessageConverter(new Jackson2JsonMessageConverter());
    return template;
}

只需将其放入你的应用程序或其他类中。

另请参见

https://docs.spring.io/spring-amqp/docs/current/reference/html/

岳华灿
2023-03-14

是的,您可以,但它采用了一种稍微不同的方法:您需要监听通用消息类型,进行一些切换,并进行自己的反序列化。当然,您可以将代码完全隐藏在某个地方(基类、注释…)

下面的例子可以扩展到任何其他类型。上面的示例是对A和B DTO类型进行过滤。

void receive(ADTO dto)
{
    System.out.print(dto.url);
}

void receive(BDTO dto)
{
    System.out.print(dto.url);
}
@RabbitListener(queues = "your.queue.name")
public void listenMesssage(Message message) 
 {
    try 
    {
        String typeId = message.getMessageProperties().getHeaders().get("__TypeId__").toString();
        String contentType = message.getMessageProperties().getContentType();
        if (contentType != "application/json" || typeId == null || !typeId.contains(ADTO.class.toString()))
        {
            //TODO log warning
            System.out.print("type not supported by this service");
            return;
        }
        Object receivedObject = new Jackson2JsonMessageConverter().fromMessage(message);

        if (receivedObject instanceof ADTO)
        {
            receive((ADTO)receivedObject);
            System.out.print("ADTO");
        }
        //else

或者,您也可以这样进行序列化:

....
String typeId = message.getMessageProperties().getHeaders().get("__TypeId__").toString();
byte[] binMsg =  message.getBody();
String strMsg  = new String(binMsg, StandardCharsets.UTF_8);
ObjectMapper mapper = new ObjectMapper();
if (typeId.contains("ADTO"))
{
   receive(mapper.readValue(strMsg, ADTO.class ));
}
else
...            
那铭
2023-03-14

嗯,这是不可能的。那么你想要的方式就不会是排队了。设计一个监听器并根据负载类型分配给它的方法,这实际上是一个架构决策。

作为一种解决方法,我可以建议您将单个@RabbitListener类中的逻辑委托给这些业务服务:

    @RabbitListener(queues = "foo")
    public class MyListener {

        private final ServiceA serviceA;

        private final ServiceB serviceB;

        public MyListener(ServiceA serviceA, ServiceB serviceB) {
              this.serviceA = serviceA;
              this.serviceB = serviceB;
        }


        @RabbitHandler
        public void handleA(A a) {
             this.serviceA.handle(a);
        }

        @RabbitHandler
        public void handleB(B b) {
             this.serviceB.handle(b);
        }
    }
 类似资料:
  • 是否可以使用topic将消息发送到队列,并有2个消费者接收和处理相同的消息?目前,我已经创建了两个消费者,他们正在观察与一个exchage主题绑定的队列,但是第一个消费者使用了该消息并删除了该队列,第二个消费者没有接收到该消息。

  • 我刚刚开始使用RabbitMQ和AMQP。 我有一个消息队列 我有多个消费者,我想用相同的消息做不同的事情。 RabbitMQ的大部分文档似乎都集中在循环(round-robin)上,即单个消息由单个消费者使用,负载在每个消费者之间分散。这的确是我目击的行为。 例如:生产者只有一个队列,每2秒发送一次消息: 这里有一个消费者: 如果我启动消费者两次,我可以看到每个消费者都在以循环行为消费交替消息。

  • 问题内容: 我一般只是开始使用RabbitMQ和AMQP。 我有一条消息队列 我有多个消费者,我想用 同一条消息 做不同的事情。 RabbitMQ的大多数文档似乎都集中在循环上,即单个消息由单个使用者使用,而负载则分散在每个使用者之间。我确实是这种行为。 例如:生产者只有一个队列,每2秒发送一次消息: 这是一个消费者: 如果我启动使用者两次,则 可以看到每个使用者都以循环方式使用替代消息。 例如,

  • 我最近开始学习Spring和spring-amqp,所以这个问题可能看起来很基本,所以请原谅我。 null 或者有一种方法可以Spring加载所有我的队列配置类,然后只使用如下所示的对象: 那么,如何在不执行每次的情况下获取确切队列的amqpTemplate呢? 每次请求到达我的服务时都做新的AnnotationConfigApplicationContext有什么害处?[我想为每个请求创建一个新

  • 问题内容: 我有这个SQL查询: 列“前缀”是字母数字值(0-9a-zA-z)。我想要做的是,如果prefix的值是一个数字,则使该数字等于0。如果它是一个字母,它将保持其值。我试图在group by子句下添加以下行: 但是我收到一个错误“将varchar值’J’转换为数据类型int时转换失败。”。 是什么导致此错误?如何获得“前缀”列以显示0或字母? 问题答案: 您显然也需要此作为表达式:

  • 到目前为止,对于RabbitMQ中的一个队列,我使用了一个通道,但现在我动态创建了多个队列,所以我必须为每个队列创建一个新通道,还是一个通道可以从不同的队列接收/发送消息?