当前位置: 首页 > 知识库问答 >
问题:

Spring-boot-starter RabbitMQ全局错误处理

满耀
2023-03-14

我使用spring-boot-starter-amqp1.4.2.Producer和消费者工作正常,但有时传入的JSON消息语法不正确。这会导致以下(正确)异常:

org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException: Listener threw exception
Caused by:  org.springframework.amqp.support.converter.MessageConversionException: Failed to convert Message content
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of java.lang.String out of START_ARRAY token...

未来我可能会面临更多的例外。因此,我想配置一个全局错误处理程序,这样,如果任何一个消费者中存在任何异常,我就可以全局处理它。

注意:在这种情况下,消息根本没有到达消费者。我想在消费者中全局处理这类异常。

请找到以下代码:

RabbitConfiguration.java

@Configuration
@EnableRabbit
public class RabbitMqConfiguration {

    @Autowired
    private CachingConnectionFactory cachingConnectionFactory;

    @Bean
    public MessageConverter jsonMessageConverter()
    {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    @Primary
    public RabbitTemplate rabbitTemplate()
    {
        RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
        template.setMessageConverter(jsonMessageConverter());
        return template;
    }

}

消费者

@RabbitListener(
        id = "book_queue",
        bindings = @QueueBinding(
                value = @Queue(value = "book.queue", durable = "true"),
                exchange = @Exchange(value = "book.exchange", durable = "true", delayed = "true"),
                key = "book.queue"
        )
    )
public void handle(Message message) {
//Business Logic
}

有人能帮我处理错误处理程序globally.Your帮助应该是可观的。

根据加里评论更新了问题

我可以运行您的示例并获得您所说的预期输出,我只是想根据您的示例尝试更多的负面案例,但我无法理解一些事情,

this.template.convertAndSend(queue().getName(), new Foo("bar"));

输出

接收:傅 [foo=酒吧]

上面的代码运行良好。现在,代替“Foo ”,我发送一些其他的bean

this.template.convertAndSend(queue().getName(), new Differ("snack","Hihi","how are you"));

输出

收到:Foo[foo=null]

消费者不应该接受这个消息,因为它是完全不同的bean(different.class而不是Foo.class),所以我希望它应该转到“ConditionalRejectingErrorHandler”。为什么它接受错误的有效负载并打印为空?如果我错了,请纠正我。

编辑1:

Gary,正如你所说,我在发送消息时设置了标头“TypeId”,但消费者仍然可以转换错误的消息,并且不会抛出任何错误...请在下面找到代码,我使用了您的代码示例,只是做了以下修改,

1)发送消息时添加了“__TypeId__ ”,

this.template.convertAndSend(queue().getName(), new Differ("snack","hihi","how are you"),m -> {
        m.getMessageProperties().setHeader("__TypeId__","foo");
        return m;
    }); 

2) 在“Jackson2JsonMessageConverter”中添加“DefaultClassMapper”

@Bean
public MessageConverter jsonConverter() {
    Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
    DefaultClassMapper mapper = new DefaultClassMapper();
    mapper.setDefaultType(Foo.class);
    converter.setClassMapper(mapper);
    return new Jackson2JsonMessageConverter();
}    

共有1个答案

阎宝
2023-03-14

重写引导的侦听器容器工厂bean,如启用侦听器endpoint注释中所述。

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setErrorHandler(myErrorHandler());
    ...
    return factory;
}

您可以注入<code>ErrorHandler

void handleError(Throwable t);

可丢弃的将是ListenerExecutionFailedException,它从版本1.6.7(启动1.4.4)开始,在其failedMessage属性中包含原始入站消息。

默认错误处理程序将<code>MessageConversionException</code>等原因视为致命原因(不会重新排序)。

如果您希望保留该行为(此类问题正常),您应该在处理错误后抛出AmqpRejectAndDontRequest eueException

顺便说一下,您不需要RabbitTemplatebean;如果应用程序中只有一个MessageConverterbean,boot将自动将其连接到容器和模板中。

因为您将覆盖boot的工厂,所以您必须在那里连接转换器。

编辑

您可以使用默认的< code > ConditionalRejectingErrorHandler ,但要注入< code > FatalExceptionStrategy 的自定义实现。事实上,您可以子类化它的< code > DefaultExceptionStrategy 并覆盖< code>isFatal(Throwable t),然后,在处理错误后,返回< code>super.isFatal(t)。

第二版

完整示例;发送 1 条好消息和 1 条坏消息:

package com.example;

import org.slf4j.Logger;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.util.ErrorHandler;

@SpringBootApplication
public class So42215050Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So42215050Application.class, args);
        context.getBean(So42215050Application.class).runDemo();
        context.close();
    }

    @Autowired
    private RabbitTemplate template;

    private void runDemo() throws Exception {
        this.template.convertAndSend(queue().getName(), new Foo("bar"));
        this.template.convertAndSend(queue().getName(), new Foo("bar"), m -> {
            return new Message("some bad json".getBytes(), m.getMessageProperties());
        });
        Thread.sleep(5000);
    }

    @RabbitListener(queues = "So42215050")
    public void handle(Foo in) {
        System.out.println("Received: " + in);
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(jsonConverter());
        factory.setErrorHandler(errorHandler());
        return factory;
    }

    @Bean
    public ErrorHandler errorHandler() {
        return new ConditionalRejectingErrorHandler(new MyFatalExceptionStrategy());
    }

    @Bean
    public Queue queue() {
        return new Queue("So42215050", false, false, true);
    }

    @Bean
    public MessageConverter jsonConverter() {
        return new Jackson2JsonMessageConverter();
    }

    public static class MyFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {

        private final Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());

        @Override
        public boolean isFatal(Throwable t) {
            if (t instanceof ListenerExecutionFailedException) {
                ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;
                logger.error("Failed to process inbound message from queue "
                        + lefe.getFailedMessage().getMessageProperties().getConsumerQueue()
                        + "; failed message: " + lefe.getFailedMessage(), t);
            }
            return super.isFatal(t);
        }

    }

    public static class Foo {

        private String foo;

        public Foo() {
            super();
        }

        public Foo(String foo) {
            this.foo = foo;
        }

        public String getFoo() {
            return this.foo;
        }

        public void setFoo(String foo) {
            this.foo = foo;
        }

        @Override
        public String toString() {
            return "Foo [foo=" + this.foo + "]";
        }

    }
}

结果:

Received: Foo [foo=bar]

2017-02-14 09:42:50.972错误44868-[cTaskExecutor-1]5050应用程序$ MyFatalExceptionStrategy:无法处理队列So42215050的入站消息;失败的消息:(Body:' some bad JSON ' message properties[headers = { TypeId = com . example . so 42215050 application $ Foo },timestamp=null,messageId=null,userId=null,receivedUserId=null,appId=null,clusterId=null,type=null,correlationId=null,replyTo=null,contentType=application/json,contentEncoding=UTF-8,contentLength=0,deliveryMode=null,receivedDeliveryMode =持久,过期

编辑3

JSON 不传达任何类型信息。默认情况下,将从方法参数类型推断出要转换为的类型。如果要拒绝无法转换为该类型的任何内容,则需要相应地配置消息转换器。

例如:

@Bean
public MessageConverter jsonConverter() {
    Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
    DefaultClassMapper mapper = new DefaultClassMapper();
    mapper.setDefaultType(Foo.class);
    converter.setClassMapper(mapper);
    return converter;
}

现在,当我将示例更改为发送 Bar 而不是 Foo 时...

public static class Bar {

   ...

}

this.template.convertAndSend(queue().getName(), new Bar("baz"));

我明白了...

Caused by: org.springframework.amqp.support.converter.MessageConversionException: Cannot handle message
... 13 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.example.So42215050Application$Bar] to [com.example.So42215050Application$Foo] for GenericMessage [payload=Bar [foo=baz], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=So42215050, amqp_contentEncoding=UTF-8, amqp_deliveryTag=3, amqp_consumerQueue=So42215050, amqp_redelivered=false, id=6d7e23a3-c2a7-2417-49c9-69e3335aa485, amqp_consumerTag=amq.ctag-6JIGkpmkrTKaG32KVpf8HQ, contentType=application/json, __TypeId__=com.example.So42215050Application$Bar, timestamp=1488489538017}]

但这仅在发送方设置<code>__TypeId__

第四版

@SpringBootApplication
public class So42215050Application {

    private final Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So42215050Application.class, args);
        context.getBean(So42215050Application.class).runDemo();
        context.close();
    }

    @Autowired
    private RabbitTemplate template;

    private void runDemo() throws Exception {
        this.template.convertAndSend(queue().getName(), new Foo("bar")); // good - converter sets up type
        this.template.convertAndSend(queue().getName(), new Foo("bar"), m -> {
            return new Message("some bad json".getBytes(), m.getMessageProperties()); // fail bad json
        });
        Message message = MessageBuilder
                .withBody("{\"foo\":\"bar\"}".getBytes())
                .andProperties(
                        MessagePropertiesBuilder
                            .newInstance()
                            .setContentType("application/json")
                            .build())
                .build();
        this.template.send(queue().getName(), message); // Success - default Foo class when no header
        message.getMessageProperties().setHeader("__TypeId__", "foo");
        this.template.send(queue().getName(), message); // Success - foo is mapped to Foo
        message.getMessageProperties().setHeader("__TypeId__", "bar");
        this.template.send(queue().getName(), message); // fail - mapped to a Map
        Thread.sleep(5000);
    }

    @RabbitListener(queues = "So42215050")
    public void handle(Foo in) {
        logger.info("Received: " + in);
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(jsonConverter());
        factory.setErrorHandler(errorHandler());
        return factory;
    }

    @Bean
    public ErrorHandler errorHandler() {
        return new ConditionalRejectingErrorHandler(new MyFatalExceptionStrategy());
    }

    @Bean
    public Queue queue() {
        return new Queue("So42215050", false, false, true);
    }

    @Bean
    public MessageConverter jsonConverter() {
        Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
        DefaultClassMapper mapper = new DefaultClassMapper();
        mapper.setDefaultType(Foo.class);
        Map<String, Class<?>> mappings = new HashMap<>();
        mappings.put("foo", Foo.class);
        mappings.put("bar", Object.class);
        mapper.setIdClassMapping(mappings);
        converter.setClassMapper(mapper);
        return converter;
    }

    public static class MyFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {

        private final Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());

        @Override
        public boolean isFatal(Throwable t) {
            if (t instanceof ListenerExecutionFailedException) {
                ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;
                logger.error("Failed to process inbound message from queue "
                        + lefe.getFailedMessage().getMessageProperties().getConsumerQueue()
                        + "; failed message: " + lefe.getFailedMessage(), t);
            }
            return super.isFatal(t);
        }

    }

    public static class Foo {

        private String foo;

        public Foo() {
            super();
        }

        public Foo(String foo) {
            this.foo = foo;
        }

        public String getFoo() {
            return this.foo;
        }

        public void setFoo(String foo) {
            this.foo = foo;
        }

        @Override
        public String toString() {
            return "Foo [foo=" + this.foo + "]";
        }

    }

    public static class Bar {

        private String foo;

        public Bar() {
            super();
        }

        public Bar(String foo) {
            this.foo = foo;
        }

        public String getFoo() {
            return this.foo;
        }

        public void setFoo(String foo) {
            this.foo = foo;
        }

        @Override
        public String toString() {
            return "Bar [foo=" + this.foo + "]";
        }

    }

}
 类似资料:
  • 我正在使用Apollo与用React编写的Web应用程序中的GraphQL服务器进行交互。我试图在应用程序中实现错误处理,并依赖于apollo-link-error。 现在,我需要处理两类错误: 可以在执行Apollo查询或变异的组件中本地处理的错误,即需要在其上显示上下文错误信息的无效表单字段 可以全局处理的错误,例如,通过在页面的某个位置显示toast通知来显示错误详细信息 显然,一旦错误在本

  • 我正在用Spring4.0.4和Spring Boot1.0.2编写一个web应用程序,使用Tomcat作为嵌入式web容器,我想实现一个全局异常处理,以截取所有异常并以特定的方式记录它们。我的简单要求是: 我想全局处理所有尚未在其他地方处理的异常(例如,在控制器异常处理程序中)。我要记录该消息,并向用户显示自定义错误消息。 我不希望Spring或web容器自己记录任何错误,因为我希望自己记录错误

  • 使用Spring Boot有几种方法可以解决这个问题。并且我选择将@ControllerAdvice与@ExceptionHandler方法一起使用。 因此,当抛出异常时,新的处理程序会捕获异常并返回一个包含消息的漂亮的json,如: 实施-没那么难。最难的部分是测试。 欢迎有任何想法。谢谢你!

  • 2020-05-09 17:28:38.521信息21308---[restartedMain]O.A.C.C.C.[Tomcat].[localhost].[/]:初始化Spring embedded WebApplicationContext 2020-05-09 17:28:38.527信息21308--[restartedMain]O.s.Web.context.ContextLoader

  • 问题内容: 我想全局拦截某些错误情况,以防止控制器自己处理错误。我认为我需要HTTP拦截器,但是我不确定如何从我的控制器中处理错误。 我有一个像这样的控制器: 和这样的HTTP拦截器: 这与浏览器重定向到“ /错误”路径一样有效。但在承诺抓的 也 执行了,我不希望这样。 我知道我可以编写代码,使其忽略404错误,但这是无法维护的。假设我进行修改以同时处理500个错误,那么我将不得不再次进行修改(以

  • 问题内容: 当我的网站是100%jQuery时,我曾经这样做: 为401错误设置全局处理程序。现在,我将angularjs与和一起使用,以向服务器发送(REST)请求。有什么办法可以类似地用角度设置全局错误处理程序吗? 问题答案: 我还在建立一个带有angular的网站,并且遇到了同样的障碍来处理全局401。当我遇到此博客文章时,我最终使用了HTTP拦截器。也许您会发现它和我一样有用。 “基于An