当前位置: 首页 > 知识库问答 >
问题:

Azure服务总线队列:如何从队列中读取单个消息

杜经艺
2023-03-14

我有一个应用程序,在这个应用程序中,我可以在进程的一部分中以JSON格式将消息写入Azure服务总线队列。我有一个下游进程,我想将该消息从队列中弹出,将json转换为一个对象,然后处理该对象。

我没有问题将消息推送到队列上,但我还没有找到任何将消息从队列中逐一或循环弹出的示例。我在微软或Github上看到的每一个例子都是一个控制台应用程序(在网络应用程序中毫无用处),它设置了某种侦听器,可以抓取队列中的所有消息并写入控制台消息。我没有找到弹出消息的示例,然后对数据进行一些处理。有人有关于如何从队列中弹出消息,然后处理它或调用另一个方法来处理消息中的数据的示例吗?

更新:我使用了WindowsAzure。下面的ServiceBus示例由Guru Pasupathy提供,并使用Azure Service Bus队列中的以下代码段结束:如何从队列读取单个消息以从BrokeredMessage对象获取消息文本:

Stream stream = message.GetBody<Stream>();
StreamReader reader = new StreamReader(stream);
string messageBody =  reader.ReadToEnd();

然后,我可以把MessageBody和反序列化嵌入JSON到一个POCO对象,我就在路上了!现在,我可以在应用程序中更有效地使用队列来执行各种任务。

共有1个答案

潘星阑
2023-03-14

您可以使用Peek Lock接收模式从队列中获取消息,对其进行处理,然后根据业务逻辑选择放弃消息或完成消息。

如果您使用的是WindowsAzure。ServiceBus nuget package发送/接收消息下面的方法可用于基于循环或单个调用逐个使用队列中的消息,而无需使用侦听器。

        public void Receive()
        {
            QueueClient myQueueClient = QueueClient.CreateFromConnectionString("<connectionString>;<queueName>", ReceiveMode.PeekLock);

            int someCount = 2; //some random value for testing
            try
            {
                for (int i = 0; i < someCount; i++)
                {
                    BrokeredMessage message = myQueueClient.Receive();
                    Console.WriteLine("The message is " + message);
                    message.Complete();
                }
            }
            catch(Exception e)
            {
                //Handle your expection
            }

        }

如果您使用的是Microsoft。蔚蓝色的然后,我找不到一种不使用侦听器直接读取单个消息的方法。我看到侦听器将继续轮询和处理队列,直到不再有消息。

如果您要求停止轮询并继续获取所有可用消息,那么作为一种解决方法,您可以在读取一条消息后关闭QueueClient实例,并在流程准备执行下一条消息时打开qc和register处理程序

        public async Task ProcessMessagesAsync(Message message, CancellationToken token)
        {
            Console.WriteLine($"Received message: {Encoding.UTF8.GetString(message.Body)}");

            BinaryFormatter bf = new BinaryFormatter();
            using (MemoryStream ms = new MemoryStream(message.Body))
            {
                Payload payload = (Payload) bf.Deserialize(ms);

                //Based on your needs you may have a condition here based on which you could Abandon or Complete the mesage

                await qc.CompleteAsync(message.SystemProperties.LockToken);
                Console.WriteLine("Completed the message --> " + payload.Message + " -- Id --> " + payload.Id);

                //await qc.AbandonAsync(message.SystemProperties.LockToken);
                //Console.WriteLine("Abandon the message --> " + payload.Message + " -- Id --> " + payload.Id);

                qc.CloseAsync(); //If you close the QueueClient instance here, no more messages will be picked up from queue.
            }
        }

上面的示例基于https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues#receive-来自队列的消息,我刚刚添加了qc。在结束时调用CloseAsync()。如果没有此行,侦听器将继续处理,直到队列中不再有消息。我不确定是否有更好的方法来实现这一点,但我想到了分享。

希望这能有所帮助

编辑-

发送邮件时,如果您使用的是自定义类型,则可以使用下面的

                BrokeredMessage message = new BrokeredMessage(new Payload() { Id = 4332, Message = "WindowsAzure package" });
                myQueueClient.Send(message);

在接收时,您应该使用GetBody,如下所示

                BrokeredMessage message = myQueueClient.Receive();
                var incoming = message.GetBody<Payload>();
                Console.WriteLine("The message is " + incoming.Id + " and " + incoming.Message);
                message.Complete();

下面是自定义对象供您参考

    [Serializable]
    public class Payload
    {
        public int Id { get; set; }
        public string Message { get; set; }
    }

这同样适用于字符串

在接收时,您可以使用

var incoming = message.GetBody<string>();

发送时您可以发送为

BrokeredMessage message = new BrokeredMessage("WindowsAzure package" );

您可以在下面的链接中获得有关不同内容格式的更多详细信息https://abhishekrlal.com/2012/03/30/formatting-the-content-for-service-bus-messages/

 类似资料:
  • 我想从azure服务总线(队列)读取所有消息。 我已经按照下面的指示链接https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-php-how-to-use-queues 目前它只能获取一条消息。。 我想从服务总线(队列)获取所有消息。 提前谢谢。。

  • 我有一个windows服务,它侦听Azure服务总线队列消息,以便从我的WebApi应用程序分发处理。此外,我还需要处理重复性任务(每晚/每周),我认为最好使用相同的系统来处理这些任务。 例如,假设我有一个“CleanupDb”队列,每天午夜删除过时的DB节点: 理论上这应该行得通,但我觉得我错过了一个更明显的处理方法。有没有更好的办法?

  • 我正在使用Azure服务总线队列。但是我不能使用“获取所有队列消息(peek Lock):微软内置于api”从队列中获取所有消息。 有没有办法获取所有队列消息? {"$连接":{"值":{"servicebus_1":{"连接ID":"/订阅/c776fex3-6aec-4722-b099-b054c267b240/资源组/Plugin-Resources/提供者/Microsoft.网络/连接/

  • 我是Azure服务总线的新手,我应该将消息推送到队列中,然后有一个单独的计划任务,该任务将读取该队列中的所有活动消息,并将它们批量导入到sql我以前尝试过这个代码,当我调用时它正在工作它在发送消息后立即工作,但现在它在单独的计划任务中不工作。任何帮助为什么或什么我可以用来批量阅读信息或这是不可能的

  • 如何轮询azure服务总线以持续检查消息?下面是我从队列接收消息的方式。 我想不断地寻找信息,然后处理它。

  • 我正在使用azure服务总线主题和订阅机制,并希望处理所有在死信队列中的消息。 此外,我想通过C#中的Azure Web作业处理消息,并将其发送回队列。所以我想知道如何通过我的应用程序处理死信队列上的消息?