我知道Azure Service Bus有一个重复消息检测功能,可以删除它认为与其他消息重复的消息。我想使用此功能来帮助防止重复传递。
我好奇的是服务如何确定两条消息实际上是重复的:
处理Azure Service Bus实体的“RequiresDuplicateDetection”属性时要考虑的另一个重要属性是“DuplicateDetectionHistoryTimeWindow”,在该时间范围内,具有重复消息id的消息将被拒绝。
重复检测时间历史记录的默认值现在是30秒,该值可以在20秒到7天之间。
启用重复检测有助于跟踪在指定时间窗口内发送到队列或主题的所有消息的应用程序控制的MessageId。如果发送的任何新消息中包含一个已在时间窗口内记录的MessageId,则该消息将报告为已接受(发送操作成功),但新发送的消息将立即被忽略并丢弃。除MessageId之外,不考虑消息的其他部分。
在我的情况下,我必须在MessageId之上应用ScheduledEnqueueTimeUtc。因为大多数情况下,在子序列重复消息到达队列之前,第一条消息已经被工作者拾取。通过添加ScheduledEnqueueTimeUtc。我们告诉服务总线将消息保留一段时间,然后再将其释放。
var message = new BrokeredMessage(json)
{
MessageId = GetMessageId(input, extra)
};
// Delay 30 seconds for Message to process
// So that Duplication Detection Engine has enought time to reject duplicated message
message.ScheduledEnqueueTimeUtc = DateTime.UtcNow.AddSeconds(30);
重复检测正在查看代理消息的MessageId属性。因此,如果您将消息Id设置为每个在重复检测中出现的消息都应该是唯一的,则可以捕获它。据我所知,只有消息Id用于检测。不会查看消息的内容,因此,如果发送的两条消息具有相同的实际内容,但具有不同的消息ID,则不会将其检测为重复。
参考文献:
MSDN文档:https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-queues-topics-subscriptions
如果场景不能容忍重复处理,那么应用程序中需要额外的逻辑来检测重复,这可以基于消息的MessageId属性来实现,该属性将在传递尝试中保持不变。这就是所谓的一次处理。
WindowsA上还有一个代理消息重复检测代码示例zure.com这应该正是您正在寻找的,只要证明它。
我还快速测试了这一点,并将5条消息发送到一个队列中,将requiresdeplicatedetection设置为true,所有消息的内容都完全相同,但消息ID不同。然后我检索了所有五条消息。然后我做了相反的操作,在那里我有匹配的messageid,但有不同的有效载荷,只检索到一条消息。
我在Azure中托管了两个云服务辅助角色,一个使用NServiceBus(Azure服务总线传输)消耗消息,另一个生成消息。 昨天,我部署了一个新版本的生产者工作者角色,而队列中仍然有大量消息,因为我们正在处理早上遗留下来的大量消息。当生产者启动时,它似乎已经清空(或者可能重新创建)队列,许多重要的生产消息丢失。这似乎很奇怪,但日志显示,大约在生产者角色启动时,消费者没有处理进一步的消息,我们知道
我正在尝试在Azure中构建一个简单的WebAPI REST服务,后端有一个服务总线队列工作器。我可以从Web API向工作人员发送一条消息。然而,我试图发送更多的信息,只是为了看看一切是如何运作的。因此,我创建了一个简单的控制器,如下所示: 当我呼叫控制器时,我只收到工作人员接收到的大约1/2的消息。其余的似乎都被放弃了。
编辑:我的发布逻辑非常简单,类似于以下内容:
我正在为Azure服务总线使用最新的Java绑定(V3.1.3):https://github.com/Azure/azure-sdk-for-java/tree/master/sdk/servicebus 当我创建一个新的队列客户端,计划一条消息,然后取消它... ...代码似乎按预期工作:活动消息计数变为0。但是一旦被调度的消息到达它应该被调度的时间(我测试了10秒和100秒以后),消息有时会
我正在使用Microsoft Azure ServiceBus对队列消息进行排队,并使用WCF对订阅进行排队。我正在尝试实现重试逻辑。我使用Peak/Lock查看消息,然后必须对消息进行一些本地处理。如果处理失败,我将解锁消息,以便再次尝试处理它。问题是我需要在处理尝试之间建立一个延迟。当前,它被弹出回队列,然后几乎立即被处理。两次尝试之间需要大约2分钟。
我们目前正在利用Azure服务总线来处理来自应用程序的各种消息。 我想知道实时处理这些消息的最佳方式是什么? 有没有一种方法可以在消息放入队列时自动执行脚本? 我只是在想,一定有比让一个单独的应用程序每分钟/30秒检查一次队列更好的方法。 谢谢各位