当前位置: 首页 > 知识库问答 >
问题:

在Azure函数中按顺序处理服务总线消息(没有并发调用)

国胤
2023-03-14
{
  "serviceBus": {
    "maxConcurrentCalls": 1
  }
}

Azure功能:

using System;
using System.Threading.Tasks;
using Microsoft.ServiceBus.Messaging;

[Singleton]
public static void Run(BrokeredMessage myQueueItem, TraceWriter log)
{
    Stream stream = myQueueItem.GetBody<Stream>();
    StreamReader reader = new StreamReader(stream);
    string messageContentStr = reader.ReadToEnd();

    log.Info($"New TEST message: {messageContentStr} on thread {System.Threading.Thread.CurrentThread.ManagedThreadId}");   

    System.Threading.Thread.Sleep(2000);     
}

以下是日志记录的摘录。如您所见,有不同的线程。例如,“消息19”位于“消息10”之前。是的,我确信我把消息放在队列中的顺序是正确的。

....
2018-05-09T09:09:33.686 [Info] New TEST message: Message 19 on thread 33
2018-05-09T09:09:35.702 [Info] Function completed (Success, Id=007eccd0-b5db-466a-91c1-4f53ec5a7b3a, Duration=2013ms)
2018-05-09T09:09:36.390 [Info] Function started (Id=b7160487-d10d-47a6-bab3-78da68a93498)
2018-05-09T09:09:36.420 [Info] New TEST message: Message 10 on thread 39
...

共有1个答案

督翰学
2023-03-14

查看并确保您的服务总线队列没有被分区。如果对其进行分区,则有多个消息代理为请求提供服务,并且不能保证消息传递的顺序。您可以在下面阅读更多关于它的信息:https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-partitioning#not-using-a-partition-key

具体来说:

在没有分区键的情况下,服务总线以循环方式将消息分发到已分区队列或主题的所有片段。如果所选的片段不可用,服务总线将消息分配给不同的片段。这样,尽管消息存储暂时不可用,但发送操作仍会成功。但是,您将无法实现分区键提供的有保证的排序。

 类似资料:
  • 我正在使用azure service bus主题,我已经为它订阅启用了会话。 在my logic应用程序中,我使用来自主题的sql事务插入数据,我使用主题订阅(peek-lock)并在订阅服务器级别将并发设置为默认,如下所示 根据我的理解,我的逻辑应用程序(订阅者)应该读取所有的消息,并且必须在FIFO中处理 我的逻辑应用程序像

  • 如果丢失了序列号,是否有办法恢复或删除Azure服务总线上的延迟消息? 场景是:我想使用来延迟消息。我计划记录序列号,并在以后使用它来检索消息。但是如果出了问题——假设部署了一些错误代码——并且序列号没有正确记录,那么这条消息似乎将以延迟状态保留在服务总线上,直到消息过期,这可能是永远的。 这主要是因为该消息将占用队列或订阅上的空间,除了完全删除队列/订阅之外,我还没有找到任何方法来恢复该空间。

  • 我正在尝试使用服务总线绑定输出为JavaScript Azure函数中的服务总线消息设置元数据。不幸的是,绑定似乎只支持主体。 查看文档,我发现您可以通过上下文访问service bus触发器中的此信息。bindingData,但我没有看到服务总线输出的任何对应接口。

  • 我在Azure Service Bus中使用代理消息传递(主题/订阅),我很好奇如何(或者是否)使用SSL保护通信。

  • 我每个websocket接收几十条消息,这些消息可能只差几毫秒就能到达。我需要用操作来处理这些数据,这些操作有时会花费一些时间(例如,在DB中的插入)。为了处理接收到的新消息,必须完成对前一个消息的处理。 我的第一个想法是用Node.js Bull(用Redis)准备一个队列,但恐怕太长了,无法运行。这些消息的处理必须保持快速。 我尝试使用JS迭代器/生成器(直到现在我还从未使用过),我测试了如下