我正在用SQS和JavaSDK发送和接收消息。几乎所有的消息都工作正常,但是其中一些丢失了,我不明白为什么。这是发送消息的代码:
final SendMessageRequest msg = new SendMessageRequest(
this.queueUrl, data.toString()
).withMessageGroupId(projectId);
final Map<String, MessageAttributeValue> attrs = new HashMap<>(1);
// message signature - HMAC calculated from message body
final String signature = // calculating signature
attrs.put(
"signature",
new MessageAttributeValue()
.withDataType("String")
.withStringValue(signature)
);
attrs.put(
"project",
new MessageAttributeValue()
.withDataType("String")
.withStringValue(projectId)
);
attrs.put(
"priority",
new MessageAttributeValue()
.withDataType("String")
.withStringValue(priority)
);
msg.setMessageDeduplicationId(
String.format("%s:%s", projectId, signature)
);
msg.setMessageAttributes(attrs);
Logger.debug(this, "sending message: %s", msg);
final SendMessageResult res = this.sqs.sendMessage(msg);
Logger.info(
this,
"message '%s' (%s) was sent: %s",
data.id(), data.type(), res
);
以及接收代码(在循环中运行):
final List<Message> messages = this.sqs.receiveMessage(
new ReceiveMessageRequest(url)
.withMessageAttributeNames(
"project", "signature", "expires", "priority"
)
.withVisibilityTimeout(
(int) Duration.ofMinutes(2L).getSeconds()
)
.withMaxNumberOfMessages(8)
).getMessages();
Logger.info(this, "received %d messages", messages.size());
for (final Message message : messages) {
Logger.debug(this, "received message: %s", message);
// actual logic here
this.sqs.deleteMessage(
new DeleteMessageRequest()
.withQueueUrl(this.queue)
.withReceiptHandle(message.getReceiptHandle())
);
}
问题是,我能够接收到一些消息,但有些消息不是(总是相同类型的数据data.type()
)。发送和接收的代码对于所有消息都是相同的。应用程序日志:
正在发送消息:{QueueUrl:https://sqs.us-east-1.amazonaws.com/0000000/my-queue.fifo,MessageBody:some unique body,MessageAttributes:{priority={StringValue:HIGH,StringListValues:[],BinaryListValues:[],DataType:String},signature={StringValue:EXTvx7WWrZ7uTU63szJ2C4VN/6ZOiw/wKL83qW7V3i0=,StringListValues:[],BinaryListValues:[],DataType:String},project={StringValue:PMO,StringListValues:[],BinaryListValues:[],数据类型:String}},MessageDeduplicationId:PMO:EXTvx7WWrZ7uTU63szJ2C4VN/6ZOiw/wKL83qW7V3i0=,MessageGroupId:PMO}
消息'e29baf85-7be4-449b-824f-e405c59cf7c4'([test])已发送:{MD5OfMessageBody:9d6e98e0e85c8f5ca7cc4c23378dc14b,MD5OfMessageAttributes:fe94ccb1b405588e0691c91392d2c8ea,消息ID:c8ce957a-93c8-49ef-9a08-9E12CB592B52B4,序列号:48188975999057872}
接收消息限制;timout=2m
收到0条消息
收到消息:限制=8条;timout=2m
收到0条消息
收到消息:限制=8条;timout=2m
收到0条消息
收到消息:限制=8条;timout=2m
收到0条消息
我检查了这些邮件的重复数据消除id是否总是不同的,并且其内容也不同。如何更深入地调试这个问题?
更新:我的队列似乎满是消息,但我无法接收它们:
$ aws sqs get-queue-attributes --queue-url="$QURL" --attribute-names=ApproximateNumberOfMessages
{
"Attributes": {
"ApproximateNumberOfMessages": "1490"
}
}
这很奇怪,因为有些消息即使在满队列的情况下也会在几秒钟内传递。
更新2:我尝试使用长轮询,但没用。
检查AWS控制台中是否有针对该队列的机上请求,因为我可以看到您对所有消息使用相同的messagegroupid,因此,如果任何消费者收到任何一条消息,消息将按顺序处理,直到消息确认或可见性超时过期,其他消费者将不会收到任何消息。
我已经建立了AWS架构体系,以便每次对发电机数据库条目的更新都以启用重复数据删除的SQS先进先出队列结束。我还有一个测试来覆盖这个场景,在那里我清除了队列(队列可以从套装中的其他测试中获得更新。为了避免在收到正确的消息之前必须轮询大量的消息,我在运行测试之前清除队列),更新Dynamo Db,并在轮询队列时检查这些条目是否收到。这个测试是不稳定的,有时会失败,因为我发送的所有更新都没有从队列中收到
我使用SQS作为视频编码的队列,并希望确保每个视频只执行一次编码。 SQS工作得很好,因为当消息排队时,它只能由单个线程接收。然而,对于相同的视频/编码,可能会向队列发送多条消息,这意味着对于特定的“编码”队列,消息内容将是相同的。 是否需要消除重复以确保对于特定队列,队列中的消息或从队列接收的消息是唯一的? 我想的一个选择是在消息发送时为每种编码类型创建一个新队列。所以队列可以命名为,它只有一条
我的Sring引导应用程序监听Amazon SQS队列。现在,我需要实现正确的消息确认--我需要接收一条消息,做一些业务逻辑,只有在此之后,如果成功,我需要ack消息(从队列中删除消息)。例如,如果我的业务逻辑出现错误,消息必须重新排队。 现在我不知道如何从我的听众那里获取信息。
我正在尝试从 kafka 主题中获取消息,并看到如果我将 auto.commit.reset 策略设置为“最早”,则所有消息都会得到正确处理。但是,如果设置为“最新”,则第一条消息将丢失,其余消息将得到正确处理。如果我在这里错过了什么,任何人都可以帮忙吗?
我有一个连接到lambda的队列(fifo)。我想在lambda中向标准队列发送一条消息。但没有发送/接收任何消息。然而,如果我尝试从非SQS连接的lambda(通过AppSync)发送它,它会工作。 我查过: lambda有权发送SQS消息(您可以在那里看到) 由于我已成功地从另一个lambda(非SQS)向标准队列发送消息,因此正确配置了标准队列 SQS URL是否正确 控制台中不会显示任何错
我需要实现向带有属性的SQS发送消息。消息的正文上传良好,但我有问题的属性。消息属性需要具有属性名称、数据类型和值的关联数组。我有这样的错误: $Attributes数组应该是什么样子?