当前位置: 首页 > 知识库问答 >
问题:

Spring BootrabbitMQ DLE不接受任何消息

郜驰
2023-03-14

我正在开发spring-boot RabbitMQ。我创建了一个死信队列,我可以在RabbitMQ管理中看到它为“d,DLE”,但没有DLK我可能错过了设置“x-dial-letter-routing-key”,问题是我不想要路由key。我的消费者很少绑定到特定的交换,我在每个交换中创建DLE,如果该交换的消费者有任何问题,那么连接到该交换的DLE接收该消息并执行依赖于用户的逻辑。但不幸的是,这不起作用,DLE没有接收任何消息。

请查找下面的代码

package com.sample.rabbit;

import org.slf4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Argument;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import    org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
import org.springframework.amqp.support.converter.DefaultClassMapper;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.util.ErrorHandler;

@SpringBootApplication
public class SampleRabbitApplication {

public static void main(String[] args) throws Exception {
    ConfigurableApplicationContext context = SpringApplication.run(SampleRabbitApplication.class, args);
    context.getBean(SampleRabbitApplication.class).runDemo();
    context.close();
}

@Autowired
private RabbitTemplate template;

private void runDemo() throws Exception {
    this.template.convertAndSend("sample-queue", new Foo("bar"),m -> {
        m.getMessageProperties().setHeader("__TypeId__","foo");
        return m;
    });

    this.template.convertAndSend("sample-queue", new Foo("throw"),m -> {
        m.getMessageProperties().setHeader("__TypeId__","foo");
        return m;
    });
    this.template.convertAndSend("sample-queue", new Foo("bar"), m -> {
        return new Message("some bad json".getBytes(), m.getMessageProperties());
    });
    Thread.sleep(5000);
}

@RabbitListener(
        id = "sample-queue",
        bindings = @QueueBinding(
                value = @org.springframework.amqp.rabbit.annotation.Queue(value = "sample-queue", durable = "true"),
                exchange = @org.springframework.amqp.rabbit.annotation.Exchange(value = "sample.exchange", durable = "true")
        )
)
public void handle(Foo in) {
    System.out.println("Received: " + in);
if("throw".equalsIgnoreCase(in.getFoo())){
        throw new BadRequestException("Foo contains throw so it throwed the exception.");
    }
}

@RabbitListener(
        id = "sample-dead-letter-queue",
        bindings = @QueueBinding(
                value = @org.springframework.amqp.rabbit.annotation.Queue(value = "sample-dead-letter-queue", durable = "true", arguments = {@Argument(name = "x-dead-letter-exchange",value = "sample.exchange"),@Argument(name = "x-dead-letter-routing-key",value = "#")}),
                exchange = @org.springframework.amqp.rabbit.annotation.Exchange(value = "critical.exchange", durable = "true",type = "topic")
        )
)
public void handleDLE(Message in) {
    System.out.println("Received in DLE: " + in.getBody());
}

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setMessageConverter(jsonConverter());
    factory.setErrorHandler(errorHandler());
    return factory;
}

@Bean
public ErrorHandler errorHandler() {
    return new ConditionalRejectingErrorHandler(new MyFatalExceptionStrategy());
}

@Bean
public MessageConverter jsonConverter() {
    Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
    DefaultClassMapper mapper = new DefaultClassMapper();
    mapper.setDefaultType(Foo.class);
    converter.setClassMapper(mapper);
    return new Jackson2JsonMessageConverter();
}

public static class MyFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {

    private final Logger LOG = org.slf4j.LoggerFactory.getLogger(getClass());

    public boolean isFatal(Throwable t) {
        if (t instanceof ListenerExecutionFailedException && isCauseFatal(t.getCause())) {
            //To do : Here we have to configure DLE(Critical queue) and put all the messages in the critical queue.
            ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;
            if(lefe.getFailedMessage() != null) {
                LOG.info("Failed to process inbound message from queue "
                        + lefe.getFailedMessage().getMessageProperties().getConsumerQueue()
                        + "; failed message: " + lefe.getFailedMessage(), t);
            } else {
                LOG.info("Failed to process inbound message from queue "
                        + lefe.getMessage(), t);
            }
        }
        return super.isFatal(t);
    }

    private boolean isCauseFatal(Throwable cause) {
        return cause instanceof MessageConversionException
                || cause instanceof org.springframework.messaging.converter.MessageConversionException
                || cause instanceof MethodArgumentNotValidException
                || cause instanceof MethodArgumentTypeMismatchException
                || cause instanceof NoSuchMethodException
                || cause instanceof ClassCastException
                || isUserCauseFatal(cause);
    }

    /**
     * Subclasses can override this to add custom exceptions.
     * @param cause the cause
     * @return true if the cause is fatal.
     */
    protected boolean isUserCauseFatal(Throwable cause) {
        return true;
    }


}

public static class Foo {

    private String foo;

    public Foo() {
        super();
    }

    public Foo(String foo) {
        this.foo = foo;
    }

    public String getFoo() {
        return this.foo;
    }

    public void setFoo(String foo) {
        this.foo = foo;
    }

    @Override
    public String toString() {
        return "Foo [foo=" + this.foo + "]";
    }

}
} 

我的交换和队列是直接的,每个用户使用不同的路由密钥,但属于同一个交换,所以我如何编写一个DLE来有效地使用所有的失败消息。在上面的代码示例中,一个消息是成功的,另一个是失败的,但我看不到DLE中的失败消息。

如有任何帮助,不胜感激。

共有1个答案

易元青
2023-03-14

如果使用死信交换(DLX)配置队列,但没有死信路由密钥,则消息将使用原始路由密钥路由到DLX。

处理用例的最简单方法是使DLX成为主题交换,并使用路由键#(所有消息的通配符)将队列绑定到它,所有错误都将转到该队列。

如果要将错误隔离到各个队列中,则使用原始路由密钥为每个队列绑定一个DLQ。

@RabbitListener(id = "sample-queue",
        bindings = @QueueBinding(value = @Queue(value = "sample-queue", durable = "true", arguments =
                        @Argument(name = "x-dead-letter-exchange", value = "critical.exchange")),
                    exchange = @Exchange(value = "sample.exchange", durable = "true")))
public void handle(Foo in) {
    System.out.println("Received: " + in);
}

@RabbitListener(id = "sample-dead-letter-queue", containerFactory = "noJsonContainerFactory",
        bindings = @QueueBinding(value = @Queue(value = "sample-dead-letter-queue", durable = "true"),
            exchange = @Exchange(value = "critical.exchange", durable = "true", type = "topic"),
            key = "#"))
public void handleDLE(Message in) {
    System.out.println("Received in DLE: " + new String(in.getBody()));
}

@Bean
public SimpleRabbitListenerContainerFactory noJsonContainerFactory(ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setErrorHandler(errorHandler());
    return factory;
}
 类似资料:
  • 我有一个关于使用无效证书通过https测试网站的问题。你能帮忙吗?我正在临时服务器上测试一个网站。它需要https,并且使用了无效的证书,该证书属于生产服务器。因此,当我访问该网站时,FireFox会显示“此连接不受信任页面”。我已经设法让firefox跳过页面;但是,如果我不使用Selenium(Python绑定)运行它,它将再次显示“Untrusted”页面。所以,我做了更多的研究,发现: h

  • 嵌套异常是com.fasterxml.jackson.databind.jsonMappingException:无法从START_OBJECT令牌反序列化java.lang.String实例

  • 问题内容: 我确实不明白这小段代码的错误所在: 它给了我错误: 问题答案: 在Python 3.3和更高版本中,如果您同时覆盖和,则需要避免将任何其他参数传递给您要覆盖的方法。如果您仅覆盖这些方法之一,则可以将额外的参数传递给另一个方法(因为通常在没有您帮助的情况下会发生这种情况)。 因此,要修复您的类,请按以下方式更改方法:

  • 我开发了一个Web应用程序并将我的证书用于SSL。我不需要CA证书,因为客户端正在连接到内部网中的服务器。所以客户端每次都会收到不受信任的连接错误。你知道我们如何在每个浏览器中摆脱这个错误吗?我试图将我生成的证书添加到浏览器,但它不起作用,也无法识别我的证书类型。 谢谢 --法力

  • 我试图用JUnit实现一些Selenium2WebDriver测试。有关硒的文件。org和web让我感到困惑,因为它似乎在seleniumrc和Webdriver之间来回跳跃。另外,我的Java不是很强。几年前我上过几门课,但没怎么用。我想让JUnit测试在无头CI服务器上运行,并让Firefox通过使用Webdriver在远程客户端系统上运行。 根据我收集的信息,我可以使用以下代码在本地系统上打

  • 我有一个springboot消费者应用程序。当我第一次运行它时,它消耗了来自Kafka主题的信息。但当我再次运行它时,它停止了消耗。在日志中,我看到以下消息。 我知道消费者无法获得偏移量。在这种情况下,消费者将引用自动偏移重置属性。如您所见,我已将其设置为,希望消费者从头开始阅读。但它没有。 应用程序. yml 在我的Java课上 我尝试了一些东西。 我将值设置为。不出所料,它抛出了一个异常,抱怨