当前位置: 首页 > 知识库问答 >
问题:

在多个进程中使用来自Azure服务总线的消息

巩子实
2023-03-14

我正在开发一个基于Azure服务总线的系统,通过一个API和后台服务通过一个主题对大量消息进行异步处理,用于快速启动和忘记。在这个问题的上下文中,主题有一个单一的订阅,为什么它本来可以是一个队列。出于其他原因,我想把这作为一个话题保留下来。

我最近使用WindowsAzure.ServiceBus包将代码从.NET framework应用程序迁移到使用Microsoft.Azure.ServiceBus包的.NET核心包。为了处理大量消息,我使用了MessageReceiver类,如下所示:

var connString = "...";
var subscriptionPath = EntityNameHelper.FormatSubscriptionPath("topic", "subscription");
var messageReceiver = new MessageReceiver(connString, subscriptionPath);
while (...)
{
    var messages = await messageReceiver.ReceiveAsync(10, TimeSpan.FromSeconds(5));
    ...
}

为了简单起见,我隐藏了一系列细节。比如我的应用程序启动5个线程,并使用相同的MessageReceiver实例在每个线程中处理消息。

我通常有一个以上的应用程序实例运行,以分散在线程和进程中。我相信我们终于明白了我的问题。在迁移到.NET核心和新的NuGet包之后,我注意到其中只有一个应用程序在同时处理消息。当打开两个控制台窗口并在每个窗口中启动一个进程时,我可以看到窗口1中的app开始处理。Windows2中的应用程序不处理任何内容。若干秒后,windows 1中的应用程序停止处理,windows 2中的应用程序开始处理。过了一段时间,它又换回来了。交换机中没有真正的模式,但我的所有消息都被成功处理了。

messageReceiver中是否存在某种限制,允许最大的线程总数处理来自同一订阅的消息或类似的内容?

共有1个答案

慕容念
2023-03-14

我不知道messageReceiver在线程数方面有什么限制。不过,新的库经过优化,利用了并发性,不需要线程(异步代码)。因此,从技术上讲,您可以使用一个线程运行,并具有多个并发接收任务。另一种方法是使用queueclientsubscriptionclient提供的消息处理程序,它们允许为轻松地处理多个消息指定并发,但它们允许每个并发回调接收一个消息(不批处理)。

代理在一次调用中向第一个竞争的消费者提供尽可能多的消息。如果没有足够的消息,那么所有的消息都将被提供给单个(或前几个)消费者。没有循环往复和公平分配。它确实如预期的那样工作。

 类似资料:
  • 我已经创建了一个简单的窗口服务来使用来自Azure服务总线队列的消息。我使用TopShelch创建windows服务。下面的代码从这里剪切如下示例:https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues 高频。Run(); ServiceBusHe

  • 我在Azure中托管了两个云服务辅助角色,一个使用NServiceBus(Azure服务总线传输)消耗消息,另一个生成消息。 昨天,我部署了一个新版本的生产者工作者角色,而队列中仍然有大量消息,因为我们正在处理早上遗留下来的大量消息。当生产者启动时,它似乎已经清空(或者可能重新创建)队列,许多重要的生产消息丢失。这似乎很奇怪,但日志显示,大约在生产者角色启动时,消费者没有处理进一步的消息,我们知道

  • 我正在尝试在Azure中构建一个简单的WebAPI REST服务,后端有一个服务总线队列工作器。我可以从Web API向工作人员发送一条消息。然而,我试图发送更多的信息,只是为了看看一切是如何运作的。因此,我创建了一个简单的控制器,如下所示: 当我呼叫控制器时,我只收到工作人员接收到的大约1/2的消息。其余的似乎都被放弃了。

  • 我有一个服务总线主题与50个订阅者有他们自己的过滤器。如果有人猜到订户的名字,我如何从49个订户中获得消息?我可以验证订阅者凭据吗? 现在我的B计划是创建一个50个队列,以便每个队列都有自己的安全连接字符串。有谁能提出正确的方法吗?

  • 要求:我需要从队列中获取所有会话中的所有消息应该在消息出现在队列中时立即使用它们。 问题:代码接受与会话id(xyz)之一相关的消息,然后等待。即使使用该会话id(xyz)推送更多消息,它也无法使用它。 任何建议-我在这里错过了什么明显的东西。 我有,用于从队列连续接收会话消息。每当我开始新会话时,我只收到一个的消息。 测试队列:在这个队列中,有20条消息可用,有4个不同的会话ID。每当我运行Ja

  • 我正在编写一个实用程序来监视我们的Azure服务总线主题和订阅。 我可以获得主题详细信息,如名称、队列消息计数和死信消息计数,但我希望获得已处理的消息数。 看来不包含任何属性来获取已处理的消息数。 以前有人试过这么做吗?