当前位置: 首页 > 知识库问答 >
问题:

Azure服务总线消息锁定未续订?

后烨煜
2023-03-14

我在Azure服务总线中构建了一个支持多队列订阅的服务,但是我正在得到一些奇怪的行为。

我的subscription singleton类有一个如下所示的方法:

    public void Subscribe<TMessage>(Func<TMessage, Task> execution, int maxDop = 1, int ttl = 60) where TMessage : IServiceBusMessage
    {
        try
        {
            var messageLifespan = TimeSpan.FromSeconds(ttl);
            var messageType = typeof(TMessage);
            if (!_activeSubscriptionClients.TryGetValue(messageType, out var subscriptionClient))
            {
                subscriptionClient = _subscriptionClientFactory.Create(typeof(TMessage)).GetAwaiter().GetResult();
                if (subscriptionClient.OperationTimeout < messageLifespan) subscriptionClient.OperationTimeout = messageLifespan;
                if (subscriptionClient.ServiceBusConnection.OperationTimeout < messageLifespan)
                    subscriptionClient.ServiceBusConnection.OperationTimeout = messageLifespan;
                _activeSubscriptionClients.AddOrUpdate(messageType, subscriptionClient, (key, value) => value);
            }

            var messageHandlerOptions = new MessageHandlerOptions(OnException)
            {
                MaxConcurrentCalls = maxDop,
                AutoComplete = false,
                MaxAutoRenewDuration = messageLifespan,
            };


            subscriptionClient.RegisterMessageHandler(
                async (azureMessage, cancellationToken) =>
                {
                    try
                    {
                        var textPayload = _encoding.GetString(azureMessage.Body);
                        var message = JsonConvert.DeserializeObject<TMessage>(textPayload);
                        if (message == null)
                            throw new FormatException($"Cannot deserialize the message payload to type '{typeof(TMessage).FullName}'.");
                        await execution.Invoke(message);
                        await subscriptionClient.CompleteAsync(azureMessage.SystemProperties.LockToken);
                    }
                    catch (Exception ex)
                    {
                        _logger.LogError(ex, "ProcessMessagesAsync(Message, CancellationToken)");
                        await subscriptionClient.AbandonAsync(azureMessage.SystemProperties.LockToken);
                    }
                }
                , messageHandlerOptions);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Subscribe(Action<TMessage>)");
            throw;
        }
    }

其思想是,您为特定类型的消息订阅Azure Service Bus,该消息直接对应于队列。在订阅中,传入一个委托以了解如何处理消息。

谁能告诉我,我需要做什么不同的,以确保消费者拥有的消息,直到完成?

共有1个答案

袁卓
2023-03-14

有几个选项,我能想到,你可能想看看。

>

  • 在SubscriptionClient中不使用默认的receiveMode=peeklock,而是将其设置为ReceiveAndDelete,这样一旦消息被使用,它就会从队列中删除,不会被任何其他客户端使用,这意味着您必须优雅地处理异常并执行重试;

    查看operationtimeout,根据doco,它的持续时间为持续时间,在此之后,单个操作将超时

  •  类似资料:
    • 我在Azure中托管了两个云服务辅助角色,一个使用NServiceBus(Azure服务总线传输)消耗消息,另一个生成消息。 昨天,我部署了一个新版本的生产者工作者角色,而队列中仍然有大量消息,因为我们正在处理早上遗留下来的大量消息。当生产者启动时,它似乎已经清空(或者可能重新创建)队列,许多重要的生产消息丢失。这似乎很奇怪,但日志显示,大约在生产者角色启动时,消费者没有处理进一步的消息,我们知道

    • 我正在开发一个Azure逻辑应用,其中有一个服务总线窥视锁触发器。服务总线的消息锁定持续时间为5分钟。我希望每5分钟有一个自动更新令牌机制,以防逻辑应用实例需要超过5分钟,以便该过程可以继续,而无需释放锁,让另一个消费者将消息带走。 有什么方法可以通过Logic应用程序实现这一点吗?

    • 我有一个windows服务,它侦听Azure服务总线队列消息,以便从我的WebApi应用程序分发处理。此外,我还需要处理重复性任务(每晚/每周),我认为最好使用相同的系统来处理这些任务。 例如,假设我有一个“CleanupDb”队列,每天午夜删除过时的DB节点: 理论上这应该行得通,但我觉得我错过了一个更明显的处理方法。有没有更好的办法?

    • 我正在尝试在Azure中构建一个简单的WebAPI REST服务,后端有一个服务总线队列工作器。我可以从Web API向工作人员发送一条消息。然而,我试图发送更多的信息,只是为了看看一切是如何运作的。因此,我创建了一个简单的控制器,如下所示: 当我呼叫控制器时,我只收到工作人员接收到的大约1/2的消息。其余的似乎都被放弃了。

    • 我有点困惑,试图确定如何窥视锁的工作在服务总线。特别是,我使用Microsoft.Azure.ServiceBus和Azure函数以及ServiceBustrigger。 据我所知,消息被锁定的时间设置在队列本身上,默认值为30秒,但也可以设置在任何地方,最多为5分钟。 当从队列中窥视到一条消息时,这个锁就会启动。 我还有的问题是 是否存在maxAutoRenewDuration可以设置的限制。我

    • 我在Azure Service Bus中使用代理消息传递(主题/订阅),我很好奇如何(或者是否)使用SSL保护通信。