我有一个处于RPC模式的消费者(RabbitListner),我想知道是否有可能引发发布者可以处理的异常。
为了更清楚地说明我的情况如下:
我尝试使用AmqpRejectAndDontRequeueException,但我的发布者没有收到异常,只收到一个空响应。
这样做是可能的,还是不是一个好的实践?
编辑1:
在@GaryRussel回复之后,我的问题就解决了:
>
对于RabbitListner,我创建了一个错误处理程序:
@Configuration
public class RabbitErrorHandler implements RabbitListenerErrorHandler {
@Override public Object handleError(Message message, org.springframework.messaging.Message<?> message1, ListenerExecutionFailedException e) {
throw e;
}
}
将bean定义为配置文件:
@配置公共类RabbitConfig扩展了RabbitConfiguration{
@Bean
public RabbitTemplate getRabbitTemplate() {
Message.addWhiteListPatterns(RabbitConstants.CLASSES_TO_SEND_OVER_RABBITMQ);
return new RabbitTemplate(this.connectionFactory());
}
/**
* Define the RabbitErrorHandle
* @return Initialize RabbitErrorHandle bean
*/
@Bean
public RabbitErrorHandler rabbitErrorHandler() {
return new RabbitErrorHandler();
}
}
使用参数创建@RabbitListner,其中rabbitErrorHandler是我之前定义的bean:
@Override
@RabbitListener(queues = "${rabbit.queue}"
, errorHandler = "rabbitErrorHandler"
, returnExceptions = "true")
public ReturnObject receiveMessage(Message message) {
对于RabbitTemplate,我设置了以下属性:
rabbitTemplate.setMessageConverter(new RemoteInvocationAwareMessageConverterAdapter());
当消息受到消费者的威胁,但它发送了一个错误,我获得一个远程调用结果,其中包含原始异常到e.get原因()。
对我起作用的是:
在“服务”方面:
>
服务
@RabbitListener(id = "test1", containerFactory ="BEAN CONTAINER FACTORY",
queues = "TEST QUEUE", returnExceptions = "true")
DataList getData() {
// this exception will be transformed by rabbit error handler to a RemoteInvocationResult
throw new IllegalStateException("mon expecion");
//return dataHelper.loadAllData();
}
在“请求”方面:
>
服务
public void fetchData() throws AmqpRemoteException {
var response = (DataList) amqpTemplate.convertSendAndReceive("TEST EXCHANGE", "ROUTING NAME", new Object());
Optional.ofNullable(response)
.ifPresentOrElse(this::setDataContent, this::handleNoData);
}
@Bean
AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
var rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter);
return rabbitTemplate;
}
@Bean
MessageConverter jsonMessageConverter() {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
objectMapper.registerModule(new JavaTimeModule());
var jsonConverter = new Jackson2JsonMessageConverter(objectMapper);
DefaultClassMapper classMapper = new DefaultClassMapper();
Map<String, Class<?>> idClassMapping = Map.of(
DataList.class.getName(), DataList.class,
RemoteInvocationResult.class.getName(), RemoteInvocationResult.class
);
classMapper.setIdClassMapping(idClassMapping);
jsonConverter.setClassMapper(classMapper);
// json converter with returned exception awareness
// this will transform RemoteInvocationResult into a AmqpRemoteException
return new RemoteInvocationAwareMessageConverterAdapter(jsonConverter);
}
您必须将消息作为错误返回,消费应用程序可以选择将其视为异常。然而,我认为正常的异常处理流不适用于消息传递。您的发布应用程序(RPC服务的使用者)需要知道可能出现的问题,并通过编程来处理这些可能性。
请参阅@RabbitListener
(自2.0以来)上的returExceptions
属性。文档在这里。
returnExceptions
属性,当true
时,将导致异常返回给发送方。异常被包装在RemoteInvocationResult
对象中。
在发送方端,有一个可用的RemoteInvocationAwareMessageConverterAdapter
,如果配置到rabbitmplate
,它将重新抛出服务器端异常,并封装在amqpmoteexception
中。服务器异常的堆栈跟踪将通过合并服务器和客户端堆栈跟踪来合成。
重要的
这种机制通常只适用于默认的SimpleMessageConverter,它使用Java序列化;异常通常不“Jackson友好”,因此无法序列化为JSON。如果您正在使用JSON,请考虑使用Error Healthor在抛出异常时返回其他杰克逊友好的错误对象。
在使用Spring Kafka Consumer时,我有时会收到以下错误消息。如代码片段所示,我至少实现了一次语义 1)我的疑问是,我是否错过了来自消费者的任何信息? 2) 我需要处理这个错误吗。由于 org.apache.kafka.clients.consumer.提交失败异常:无法完成偏移提交,因为消费者不是自动分区分配的活动组的一部分;消费者很可能被踢出组。 我的SpringKafka消费
我是Kafka的新手,我对消费者的理解是,基本上有两种类型的实现 1)高级消费者/消费者群体 2)简单消费者 高级抽象最重要的部分是当Kafka不关心处理偏移量,而Simple消费者对偏移量管理提供了更好的控制时使用它。让我困惑的是,如果我想在多线程环境中运行consumer,并且还想控制偏移量,该怎么办。如果我使用消费者组,这是否意味着我必须读取存储在zookeeper中的最后一个偏移量?这是我
是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?
本文向大家介绍消费者和消费者组有什么关系?相关面试题,主要包含被问及消费者和消费者组有什么关系?时的应答技巧和注意事项,需要的朋友参考一下 每个消费者从属于消费组。具体关系如下:
Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka
null 当侦听器处理记录后返回时提交偏移量。 如果侦听器方法抛出异常,我会认为偏移量不会增加。但是,当我使用下面的code/config/command组合对其进行测试时,情况并非如此。偏移量仍然会得到更新,并且继续处理下一条消息。 我的配置: 验证偏移量的命令: 我使用的是kafka2.12-0.10.2.0和org.springframework.kafka:spring-kafka:1.1