我目前正在从事一个项目,涉及使用来自RabbitMQ Brocker的消息。不过,我对Spring集成、AMQP和RabbitMQ还是个新手。我有一个问题是使用错误的消息格式。当我的使用者接收到一个格式错误的消息时,它将其返回队列,然后RabbitMQ将其返回,这就产生了一个无休止的循环。在Spring Integration文档中,有一些配置可以实现,使得这种消息不会返回到队列中。
class ExceptionHandler {
public void handle(Throwable e ) {
Logger.log("Some log ... we don't give a Sh** ... ") ;
}
}
是的,这是正确的解决方案之一--当您确定消息不应被requeued
时,抛出AMQPrejectandDontRequeueException
。
在SimpleMessageListenerContainer
上还有DefaultRequeuerEjected
,默认情况下为True
。
您也许应该看看DLX/DLQ解决方案,以免丢失那些格式错误的消息。
请分享困扰你的StackTrace。
SimpleMessageListenerContainer
中有这样的代码:
catch (AmqpRejectAndDontRequeueException rejectEx) {
/*
* These will normally be wrapped by an LEFE if thrown by the
* listener, but we will also honor it if thrown by an
* error handler.
*/
}
当RabbitMq消息到达队列时,我目前正在使用IntegrationFlow来触发作业执行。IntegrationFlow的AmqpInFronChannelAdapter和作业的第一步的ItemReader都配置为从同一队列中读取消息。 我遇到的问题是IntegrationFlow的AmqpInboundChannelAdapter读取RabbitMQ消息,然后ItemReader再也找不到该
我对RabbitMQ很陌生,所以如果我的问题听起来很琐碎,请原谅。我想在RabbitMQ上发布消息,它将由RabbitMQ消费者处理。 我的消费者机器是一个多核机器(最好是azure上的工作者角色)。但QueueBasicConsumer一次推送一条消息。我如何编程来利用我可以同时处理多个消息的所有核心。 一种解决方案是在多个线程中打开多个通道,然后在那里处理消息。但在这种情况下,我将如何决定线程
我试图做一个简单的poc与Spring启动与版本(2.3.7发布)的SpringKafka,以实现消费者批处理的工作原理,以及如何再平衡工作,如果消费者需要更多的流转时长,因为我是全新的这个消息系统。 现在我看到kafka重新平衡单个消费者(不允许并发)的问题。 这些是我设置的max.poll.interval属性。ms=50000和factory.getContanerProperties。se
我为RabbitMQ制作了一个消费者,作为一个用C#.NET编写的控制台应用程序。它被编程为永久监听队列,每当它在队列中发现消息时,它就处理它。使用者平均每秒处理35条消息。使用者被安排在系统启动时在任务计划程序中运行。消费者运行良好的3-4天。但是,它们继续运行,但不处理任何消息,尽管队列中有消息。当使用者停止并再次启动时,它再次开始正确处理消息。但是,当您手动重新启动时,数以百万计的消息排在队
我尝试在使用邮件时进行以下错误处理: 如果出现序列化错误:在DLT中发送消息 我拥有的(2.5.1Kafka客户端的Spring kafka 2.5.5版本)如下: 现在,如果我发送不可序列化的消息,我的消息将不重试地发送到DLT- 在我的中,我有一个,捕获并重新捕获。 我应该没有重试,但我得到了2个重试,每个20秒(而不是10秒?),并在2次重试后向DLT发送了一条消息。 如果我删除errorH
我看了一个教程,它解释了如何将RabbitMQ集成到Spring Boot应用程序中。在本教程中,(使用者)和(生产者)类位于同一个项目中。我想在两个不同的Spring Boot应用程序中实现它们。但是,由于类的原因,我无法将教程项目拆分为两个使用者项目和生产者项目。因为它耦合了(使用者)和(生产者)类。 如何实现和配置两个不同的cosnumer和producer Spring Boot应用程序?