当前位置: 首页 > 知识库问答 >
问题:

一些消息在SQS中丢失

麹高远
2023-03-14

我正在用SQS和JavaSDK发送和接收消息。几乎所有的消息都工作正常,但是其中一些丢失了,我不明白为什么。这是发送消息的代码:

final SendMessageRequest msg = new SendMessageRequest(
    this.queueUrl, data.toString()
).withMessageGroupId(projectId);
final Map<String, MessageAttributeValue> attrs = new HashMap<>(1);
// message signature - HMAC calculated from message body
final String signature = // calculating signature
attrs.put(
    "signature",
    new MessageAttributeValue()
        .withDataType("String")
        .withStringValue(signature)
);
attrs.put(
    "project",
    new MessageAttributeValue()
        .withDataType("String")
        .withStringValue(projectId)
);
attrs.put(
    "priority",
     new MessageAttributeValue()
         .withDataType("String")
         .withStringValue(priority)
);

msg.setMessageDeduplicationId(
    String.format("%s:%s", projectId, signature)
);
msg.setMessageAttributes(attrs);
Logger.debug(this, "sending message: %s", msg);
final SendMessageResult res = this.sqs.sendMessage(msg);
Logger.info(
    this,
    "message '%s' (%s) was sent: %s",
    data.id(), data.type(), res
);

以及接收代码(在循环中运行):

final List<Message> messages = this.sqs.receiveMessage(
    new ReceiveMessageRequest(url)
        .withMessageAttributeNames(
            "project", "signature", "expires", "priority"
        )
        .withVisibilityTimeout(
            (int) Duration.ofMinutes(2L).getSeconds()
        )
        .withMaxNumberOfMessages(8)
).getMessages();
Logger.info(this, "received %d messages", messages.size());
for (final Message message : messages) {
    Logger.debug(this, "received message: %s", message);
    // actual logic here
    this.sqs.deleteMessage(
         new DeleteMessageRequest()
             .withQueueUrl(this.queue)
             .withReceiptHandle(message.getReceiptHandle())
    );
}

问题是,我能够接收到一些消息,但有些消息不是(总是相同类型的数据data.type())。发送和接收的代码对于所有消息都是相同的。应用程序日志:

正在发送消息:{QueueUrl:https://sqs.us-east-1.amazonaws.com/0000000/my-queue.fifo,MessageBody:some unique body,MessageAttributes:{priority={StringValue:HIGH,StringListValues:[],BinaryListValues:[],DataType:String},signature={StringValue:EXTvx7WWrZ7uTU63szJ2C4VN/6ZOiw/wKL83qW7V3i0=,StringListValues:[],BinaryListValues:[],DataType:String},project={StringValue:PMO,StringListValues:[],BinaryListValues:[],数据类型:String}},MessageDeduplicationId:PMO:EXTvx7WWrZ7uTU63szJ2C4VN/6ZOiw/wKL83qW7V3i0=,MessageGroupId:PMO}
消息'e29baf85-7be4-449b-824f-e405c59cf7c4'([test])已发送:{MD5OfMessageBody:9d6e98e0e85c8f5ca7cc4c23378dc14b,MD5OfMessageAttributes:fe94ccb1b405588e0691c91392d2c8ea,消息ID:c8ce957a-93c8-49ef-9a08-9E12CB592B52B4,序列号:48188975999057872}
接收消息限制;timout=2m
收到0条消息
收到消息:限制=8条;timout=2m
收到0条消息
收到消息:限制=8条;timout=2m
收到0条消息
收到消息:限制=8条;timout=2m
收到0条消息

我检查了这些邮件的重复数据消除id是否总是不同的,并且其内容也不同。如何更深入地调试这个问题?

更新:我的队列似乎满是消息,但我无法接收它们:

$ aws sqs get-queue-attributes --queue-url="$QURL" --attribute-names=ApproximateNumberOfMessages
{
    "Attributes": {
        "ApproximateNumberOfMessages": "1490"
    }
}

这很奇怪,因为有些消息即使在满队列的情况下也会在几秒钟内传递。

更新2:我尝试使用长轮询,但没用。

共有1个答案

索锐藻
2023-03-14

检查AWS控制台中是否有针对该队列的机上请求,因为我可以看到您对所有消息使用相同的messagegroupid,因此,如果任何消费者收到任何一条消息,消息将按顺序处理,直到消息确认或可见性超时过期,其他消费者将不会收到任何消息。

 类似资料:
  • 我已经建立了AWS架构体系,以便每次对发电机数据库条目的更新都以启用重复数据删除的SQS先进先出队列结束。我还有一个测试来覆盖这个场景,在那里我清除了队列(队列可以从套装中的其他测试中获得更新。为了避免在收到正确的消息之前必须轮询大量的消息,我在运行测试之前清除队列),更新Dynamo Db,并在轮询队列时检查这些条目是否收到。这个测试是不稳定的,有时会失败,因为我发送的所有更新都没有从队列中收到

  • 我使用SQS作为视频编码的队列,并希望确保每个视频只执行一次编码。 SQS工作得很好,因为当消息排队时,它只能由单个线程接收。然而,对于相同的视频/编码,可能会向队列发送多条消息,这意味着对于特定的“编码”队列,消息内容将是相同的。 是否需要消除重复以确保对于特定队列,队列中的消息或从队列接收的消息是唯一的? 我想的一个选择是在消息发送时为每种编码类型创建一个新队列。所以队列可以命名为,它只有一条

  • 我的Sring引导应用程序监听Amazon SQS队列。现在,我需要实现正确的消息确认--我需要接收一条消息,做一些业务逻辑,只有在此之后,如果成功,我需要ack消息(从队列中删除消息)。例如,如果我的业务逻辑出现错误,消息必须重新排队。 现在我不知道如何从我的听众那里获取信息。

  • 我正在尝试从 kafka 主题中获取消息,并看到如果我将 auto.commit.reset 策略设置为“最早”,则所有消息都会得到正确处理。但是,如果设置为“最新”,则第一条消息将丢失,其余消息将得到正确处理。如果我在这里错过了什么,任何人都可以帮忙吗?

  • 我有一个连接到lambda的队列(fifo)。我想在lambda中向标准队列发送一条消息。但没有发送/接收任何消息。然而,如果我尝试从非SQS连接的lambda(通过AppSync)发送它,它会工作。 我查过: lambda有权发送SQS消息(您可以在那里看到) 由于我已成功地从另一个lambda(非SQS)向标准队列发送消息,因此正确配置了标准队列 SQS URL是否正确 控制台中不会显示任何错

  • 我需要实现向带有属性的SQS发送消息。消息的正文上传良好,但我有问题的属性。消息属性需要具有属性名称、数据类型和值的关联数组。我有这样的错误: $Attributes数组应该是什么样子?