当前位置: 首页 > 知识库问答 >
问题:

使用JMS为带有SQS的nack定制接收超时

施琦
2023-03-14

否定确认的行为是将接收消息的可见性超时更改为0。其中,在为JMS创建SQS工厂时,NACK_TIMEOUT的值是不可配置的。

https://github.com/awslabs/amazon-sqs-java-messaging-lib/blob/master/src/main/java/com/amazon/sqs/javamessaging/acknowledge/NegativeAcknowledger.java#L99

当收到一条消息,并且处理失败(Listener方法抛出错误)时,该消息立即再次被接收。在大多数情况下,可以在一定的延迟时间内处理消息。

是否可以将其配置为不更改可见性超时,从而遵守队列的默认接收超时配置

共有2个答案

钮安歌
2023-03-14

那些根本不想发送ChangeMessageViality请求(重置当前可见性计时器)并且只想依赖默认队列可见性超时的人可以使用以下在lombok帮助下编写的Amazon SQS包装器:

java prettyprint-override">public AmazonSQS amazonSqs(AWSCredentialsProvider credentialsProvider) {
    AmazonSQS amazonSQS = AmazonSQSClientBuilder.standard()
            .withRegion(region)
            (...)
            .build();

    return new IgnoringChangeVisibilityAmazonSqs(amazonSQS);
}

@Slf4j
@RequiredArgsConstructor
public class IgnoringChangeVisibilityAmazonSqs implements AmazonSQS {

    private interface ChangeVisibilityOperations {
        ChangeMessageVisibilityResult changeMessageVisibility(ChangeMessageVisibilityRequest changeMessageVisibilityRequest);
        ChangeMessageVisibilityResult changeMessageVisibility(String queueUrl, String receiptHandle, Integer visibilityTimeout);
        ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch(ChangeMessageVisibilityBatchRequest request);
        ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch(String queueUrl, List<ChangeMessageVisibilityBatchRequestEntry> entries);
    }

    @Delegate(excludes = ChangeVisibilityOperations.class)
    private final AmazonSQS amazonSqs;

    @Override
    public ChangeMessageVisibilityResult changeMessageVisibility(ChangeMessageVisibilityRequest changeMessageVisibilityRequest) {
        log.info("Ignoring ChangeMessageVisibilityRequest");
        return new ChangeMessageVisibilityResult();
    }

    @Override
    public ChangeMessageVisibilityResult changeMessageVisibility(String queueUrl, String receiptHandle, Integer visibilityTimeout) {
        log.info("Ignoring ChangeMessageVisibilityRequest");
        return new ChangeMessageVisibilityResult();
    }

    @Override
    public ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch(ChangeMessageVisibilityBatchRequest request) {
        log.info("Ignoring ChangeMessageVisibilityBatchRequest");
        return changeMessageVisibilityBatch(request.getQueueUrl(), request.getEntries());
    }

    @Override
    public ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch(String queueUrl, List<ChangeMessageVisibilityBatchRequestEntry> entries) {
        log.info("Ignoring ChangeMessageVisibilityBatchRequest");
        List<ChangeMessageVisibilityBatchResultEntry> results = entries.stream().map(request ->
                new ChangeMessageVisibilityBatchResultEntry().withId(request.getId()))
                .collect(Collectors.toList());
        return new ChangeMessageVisibilityBatchResult().withSuccessful(results);
    }
}

沈飞翔
2023-03-14

显然有一种方法有点笨拙,但我没有找到更好的解决方法,它的想法是,在创建客户端时,可以附加自己的请求处理程序:

    return AmazonSQSAsyncClientBuilder.standard()
        .withRequestHandlers(new RequestHandler2() {
            @Override
            public AmazonWebServiceRequest beforeExecution(AmazonWebServiceRequest request) {
                if (request instanceof ChangeMessageVisibilityBatchRequest) {
                    ((ChangeMessageVisibilityBatchRequest) request).getEntries().forEach(changeMessageVisibilityBatchRequestEntry -> {
                        changeMessageVisibilityBatchRequestEntry.setVisibilityTimeout(15);
                    });
                    log.debug("Changed visibility timeout for request {}", request);
                }
                return request;
            }
        })
    ;
 类似资料:
  • 我正在使用flutter socket io与运行Node/Express的服务器通信。 服务器代码: 我的扑动代码: 当我尝试连接时,我会得到一个超时错误,该错误在OnConnectRetror中捕获。 节点服务器正在运行debian,我已经检查了防火墙状态: 当我通过chrome打开url时,我得到的是“你好”信息。当我尝试netcat“NC-VZ MyServerIP8080时,我已经成功地

  • 我有一个在这里描述的代理 代理的传输是JMS。代理从jms提供消息,然后通过HTTP将消息发送到后端。在后端应答之后,代理将一个应答发送回JMS。 一切正常。 有时后端处理一条消息的时间超过30秒。在这种情况下,代理无法发回响应。 WSO2Carbon.log包含 当处理一条消息的时间超过30秒时,为什么会发生这种情况?如何扩大超时时间?

  • 我目前正在使用亚马逊的SQS,在尝试删除当前“正在运行”的队列消息时遇到问题。 下面是一些示例代码: 现在,在接收到句柄和消息体之后,我将接收句柄字符串存储到云存储中(例如DynamoDB)。随后,我从存储服务中加载该句柄,并使用类似于以下内容的方式调用delete: 但是,当运行该行时,我收到一条“输入收据句柄无效”的错误消息。 注意,我知道这条消息没有被重新接收,所以记录的接收句柄应该是最新的

  • 我们正在寻找移动我们的解决方案,开始使用AWS简单队列服务,而不是兔子MQ。 现在我们使用JMS连接到我们的队列,使用一个. bindings文件配置jndi上下文进行连接。 是否可以使用. bindings文件来配置到AWS简单队列服务的连接配置区域,使用概要文件作为参数,所有这些都在。绑定文件? 如果是,你有如何做到这一点的例子吗?

  • 我试图写一个请求从我的服务器获取消息在使用签名,消息看起来像这样, POST/134148934511/Localhost/?Action=ReceiveMessage 但我收到了一条错误消息(尽管我拥有所需的权限)。(我确信aws版本4的签名过程是正确的(通过S3检查))我做错了什么?