当前位置: 首页 > 知识库问答 >
问题:

如何在azure服务总线主题接收器中处理取消令牌?

钱京
2023-03-14

我的问题是如何处理RegisterMessageHandler方法中的取消令牌,每当收到新消息时,该方法就会被调用?此外,我还想“优雅地”处理订阅客户端的关闭,也就是说,如果一条消息已经被处理,那么我希望该消息得到完全处理,然后我想关闭连接。下面是我正在使用的代码。

目前我们正按照以下方法进行:1。使用信号量锁锁定消息的进程,并在finally块中释放锁。2.调用CancellationToken.Register方法处理取消令牌。释放Register方法中的锁。

public class AzureServiceBusReceiver
{
  private SubscriptionClient subscriptionClient;
  private static Semaphore semaphoreLock;

public AzureServiceBusReceiver(ServiceBusReceiverSettings settings)
{
    semaphoreLock = new Semaphore(1, 1);
    subscriptionClient = new SubscriptionClient(
        settings.ConnectionString, settings.TopicName, settings.SubscriptionName, ReceiveMode.PeekLock);
}

public void Receive(
    CancellationToken cancellationToken)
{
    var options = new MessageHandlerOptions(e =>
    {

        return Task.CompletedTask;
    })
    {
        AutoComplete = false,

    };

    subscriptionClient.RegisterMessageHandler(
        async (message, token) =>
        {
            semaphoreLock.WaitOne();
            if (subscriptionClient.IsClosedOrClosing)
                return;
            CancellationToken combinedToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, token).Token;
            try
            {
                // message processing logic
            }
            catch (Exception ex)
            {
                await subscriptionClient.DeadLetterAsync(message.SystemProperties.LockToken);
            }
            finally
            {
                semaphoreLock.Release();
            }
        }, options);


    cancellationToken.Register(() =>
    {
        semaphoreLock.WaitOne();
        if (!subscriptionClient.IsClosedOrClosing)
            subscriptionClient.CloseAsync().GetAwaiter().GetResult();
        semaphoreLock.Release();
        return;
    });
} 
}

共有1个答案

孟晋
2023-03-14

将消息客户端实现为ICommunicationListener,因此当服务关闭时,您可以阻止调用,直到消息处理完成。不要使用静态信号量,这样您可以在项目中安全地重用代码。

下面是一个示例,说明如何做到这一点。

下面是由该代码创建的Nuget包。

 类似资料:
  • 参考https://github.com/Azure/azure-service-bus/tree/master/samples/dotnet/gettingstart/microsoft.Azure.servicebus/basicsendreceiveusingtopicsubscriptionclient,我了解Azure服务总线主题的一般工作方式,我的问题更多地是关于它实际上是如何工作的。

  • 我正在使用Azure服务总线主题机制。此外,我已经将消息发送到主题,并希望通过编程方式检查是否将消息发送到主题。 代码: 有没有办法获得响应或状态代码?

  • 我们目前正在利用Azure服务总线来处理来自应用程序的各种消息。 我想知道实时处理这些消息的最佳方式是什么? 有没有一种方法可以在消息放入队列时自动执行脚本? 我只是在想,一定有比让一个单独的应用程序每分钟/30秒检查一次队列更好的方法。 谢谢各位

  • null 输出 谁能解释一下为什么会这样吗?这对我来说有点迷惑?

  • 我正在使用azure service bus主题,我已经为它订阅启用了会话。 在my logic应用程序中,我使用来自主题的sql事务插入数据,我使用主题订阅(peek-lock)并在订阅服务器级别将并发设置为默认,如下所示 根据我的理解,我的逻辑应用程序(订阅者)应该读取所有的消息,并且必须在FIFO中处理 我的逻辑应用程序像