我有一个简单的Spring Boot服务,它使用JMSTemplate监听AWS SQS队列。当消息得到正确处理时,一切都按预期进行。
我使用的是CLIENT_ACKNOWLEDGE,因此当处理过程中抛出异常时,会再次接收消息。但是,将忽略SQS队列上的默认可见性超时设置,并立即再次接收消息。
SQS队列配置了30秒的默认可见性超时和20次接收的重新驱动策略,然后再将消息放入DLQ。
我已禁用该服务,并使用SQS控制台验证是否正确设置了默认可见性超时。我还尝试将JMS消息添加到方法签名中,并执行手动验证。
以下是JMS配置的代码:
@Configuration
@EnableJms
class JmsConfig
{
@Bean
@Conditional(AWSEnvironmentCondition.class)
public SQSConnectionFactory connectionFactory(@Value("${AWS_REGION}") String awsRegion)
{
return new SQSConnectionFactory(
new ProviderConfiguration(),
AmazonSQSClientBuilder.standard()
.withRegion(Regions.fromName(awsRegion))
.withCredentials(new DefaultAWSCredentialsProviderChain())
);
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory)
{
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setDestinationResolver(new DynamicDestinationResolver());
factory.setConcurrency("3-10");
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
factory.setErrorHandler(defaultErrorHandler());
return factory;
}
@Bean
public ErrorHandler defaultErrorHandler()
{
return new ErrorHandler()
{
@Override
public void handleError(Throwable throwable)
{
LOG.error("JMS message listener error: {}", throwable.getMessage());
}
};
}
@Bean
public JmsTemplate defaultJmsTemplate(ConnectionFactory connectionFactory)
{
return new JmsTemplate(connectionFactory);
}
}
下面是听众的代码:
@Component
public class MessagingListener
{
@Autowired
private MessageService _messageService;
@Autowired
private Validator _validator;
@JmsListener(destination = "myqueue")
public void receiveMessage(String messageJson)
{
try
{
LOG.info("Received message");
// The following line throws an IOException is the message is not JSON.
MyMessage myMessage = MAPPER.readvalue(messageJson, MyMessage.class);
Set<ConstraintViolation<MyMessage>> _validator.validate(myMessage);
if (CollectionUtils.isNotEmpty(violations))
{
String errorMessage = violations.stream()
.map(v -> String.join(" : ", v.getPropertyPath().iterator().next().getName(),
v.getMessage()))
LOG.error("Exception occurred while validating the model, details: {}", errorMessage)
throw new ValidationException(errorMessage);
}
}
catch (IOException e)
{
LOG.error("Error parsing message", e);
throw new ValidationException("Error parsing message, details: " + e.getMessage());
}
}
}
当消息被放置在SQS队列中,带有无效的JSON或未通过验证的JSON时,消息会被非常快地接收20次,然后在DLQ上结束。需要做些什么来尊重SQS中的默认可见性超时设置?
在异常的情况下,失败消息的可见性超时通过ChangeMessageViality设置为0,因此SQS将立即发送此消息,即使队列具有不同的visibilityTimeout
设置。
这是怎么发生的?
您可以在这里看到,Spring JMS的AbstractMessageListenerContainer简要地执行以下操作:
try {
invokeListener(session, message); // This is your @JMSListener method
}
catch (JMSException | RuntimeException | Error ex) {
rollbackOnExceptionIfNecessary(session, ex);
throw ex;
}
commitIfNecessary(session, message);
在rollbackonexception如果需要
方法上,会话。将调用recover(),因为:
session.get事务处理()
将始终为false,因为SQS不支持事务处理。请参阅此处。 最后,SQSSession negative的recover()会确认消息,这意味着将特定消息的visibilityTimeout
设置为0,会导致SQS立即尝试发送该消息。
覆盖此行为的最简单方法是实现CustomJmsListenerContainerFactory
public class CustomMessageListenerContainer extends DefaultMessageListenerContainer {
public CustomMessageListenerContainer() {
super();
}
@Override
protected void rollbackOnExceptionIfNecessary() {
// do nothing, so that "visibilityTimeout" will stay same
}
}
public class CustomJmsListenerContainerFactory {
@Override
protected DefaultMessageListenerContainer createContainerInstance() {
return new CustomMesageListenerContainer();
}
}
用@Component
或者像你在JmsConfig
中做的那样,把它变成Springbean:
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new CustomJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// and set other stuff on factory
return factory;
}
注意:
如果您的应用程序在SQS和JMS中使用其他类型的数据源,请确保为它们使用不同的容器和容器工厂,以便rollback OnExceptionIfNecessary
的行为符合预期。
我是AWS的新手。我在这里试图理解SQS。我也看了一些培训,但我仍然不能得到一些答案那里的讨论论坛。我在这里重复我的问题。注意,我知道下面的几个问题有明显的答案,因此更多的是一种修辞。我的困惑源于这样一个事实,即我目前对这个主题的理解导致我对在明显已知的问题之后出现在我脑海中的后续问题给出了相互矛盾的答案,并且夺走了我认为我理解得很好的任何东西的信心。 如果我有一个名为MyQueue的标准队列,并
我对SQS非常陌生,如果我忽略了一些显而易见的事情,我很抱歉,但是有没有办法获取SQS中消息的当前可见性超时?我可以在这里看到如何更新超时可见性。但是我没有看到任何关于获取消息当前可见性超时的信息(也许你可以在收到消息时查看)。 我的用例是根据给定消息的当前可见性超时更改可见性超时。这可能吗? (注意:我知道我可以使用近似的接收时间来达到类似的效果,如果不可能获得当前可见性时间,我会走那条路)
在使用AWS SQS时,消息的“SentTimestamp”属性是否在从队列接收后发生变化,但在可见性超时到期后没有删除并返回队列?
如何配置可见性超时,以便可以再次读取SQS中的消息? 我将Amazon SQS作为消息队列。消息由多个应用程序发送。我现在使用Spring listener读取队列中的消息,如下所示: 类实现了进一步使用了方法。 我还配置了一个调度器,在一段时间后再次读取队列。它使用
但是,如果我的Lambda不期望任何输入,它将自己转到SQS并拉出消息,有输入有意义吗?我是否可以让它无效,或者甚至完全使用其他方法签名(当然,在本例中不实现那个接口)?
编辑:在我写的时候解决了这个问题:P--我喜欢这样的解决方案。我想无论如何我都要把它贴出来,也许别人也会有同样的问题,找到我的解决办法。不关心点数/因果报应等等。我只是把整个事情写了出来,所以我想我应该把它和解决方案贴出来。 我有一个SQS FIFO队列。它使用的是一纸空文队列。以下是它的配置方式: 我有一个单一的生产者微服务,我有10个ECS映像运行作为消费者。 由于业务原因,我们在接近消息在队