当前位置: 首页 > 知识库问答 >
问题:

使用Azure服务总线主题时从_error队列中移动消息

高增
2023-03-14

问题是一旦解决了异常的来源,如何处理错误。一旦消息出错并最终出现在_error队列中,我希望在消息和/或服务修复后将消息移回处理。我无法将消息从_error队列移动到主题,因为该主题上的每个服务都将再次获得该消息。

我试图使用ReceiveEndpoint方法创建第二个队列,该方法名为_errorRecovery,但这样做会导致队列订阅主题,这意味着_errorRecovery队列获取发布到该主题的每个消息。

我想知道是否有一种方法可以使用MassTransit设置一个队列,该队列将只处理该队列中的消息,而不添加额外的订阅。

TEvent是消息类型,TConsumer是该消息类型的关联IConsumer实现。

  public void ConfigureType<TEvent, TConsumer>(IServiceBusBusFactoryConfigurator busConfig, Container container, MessageHandlingOptions options) where TConsumer : class, IConsumer
        {
            string subName = NameHelper.GetSubscriptionName(@namespace, _serviceName);
            var topicName = NameHelper.GetTopicName(@namespace, typeof(TEvent));

            busConfig.SubscriptionEndpoint(subName, topicName, configurator =>
            {
                configurator.ConfigureConsumer(container, typeof(TConsumer));
                if (!(options is null))
                {
                    ConfigureRetry(configurator, options);
                }
            });
        }

并构建_errorrecovery队列。每个事件还有一个相关的IConsumer,专门用于处理失败的事件。

 var subName = NameHelper.GetSubscriptionName(@namespace, _serviceName);

                        busConfig.ReceiveEndpoint(subName + "_errorrecovery", config =>
                        {
                            config.ConfigureConsumer(_simpleContainer, faultConsumers.Select(i => i.GenericType).ToArray());
                        });

这将产生一个名为subname_errorrecovery的队列和一个以事件命名的主题。服务在主题中有订阅,但_errorrecovery也有订阅。因此,每次向主题发送消息时,事件的使用者和错误的使用者都会收到消息。

共有1个答案

东门涵育
2023-03-14

Azure Service Bus允许您为您为主题创建的每个订阅设置订阅筛选器规则(请参见https://docs.microsoft.com/en-us/Azure/service-bus-messaging/topic-filters)。

这样,您就可以为订阅定义筛选规则,以便只有对应于特定规则的消息才会放在该订阅服务器的队列中进行处理。

这些筛选规则可以通过ARM模板、Azure门户或Azure CLI从代码中设置。

因此,让我们考虑一下,当您的失败消息(可能是因为临时无法访问使用者服务的数据库而出现异常)被移动到_error队列中时,会使用一些消息属性指示它的处理位置。让我们将此属性称为“ErrorOrigin”,并给它一个标识某个订阅的唯一值。

因此,在您的情况下,可以为每个订阅定义一个筛选规则,以便只有没有称为“ErrorOrigin”属性的消息,或者如果设置了属性,则其值必须与对应于该订阅的标识符名称匹配(例如,“SubscriptionXForEventZ”)

如果正确设置订阅筛选规则,订阅使用者现在将处理最初发布到该主题的所有消息,以及来自_error队列的也已发布到该主题的消息。但是使用筛选规则,Azure Service Bus只有在消息的“errorOrigin”属性与订阅对应时才会将那些失败的消息放入相应的订阅队列中。

我已经在实践中成功地在微服务体系结构中使用了这个特性。它允许非常容易地向主题添加额外的订阅者,但允许这些订阅者只从主题接收感兴趣的消息。

在这里,您可以看到Azure Service Bus上的订阅过滤示例,以便更好地了解它的工作原理:https://github.com/Azure/azure-service-bus/tree/master/samples/dotnet/microsoft.servicebus.messaging/topicfilters

我希望这个想法能帮助你解决你的具体问题。

 类似资料:
  • 我正在构建Windows Phone应用程序,无法使用Microsoft。服务总线。信息。QueueClient类。 然后,我尝试使用Azure Service Bus REST API进行发送,但这需要我构建一个SAS令牌。但要构建SAS令牌,我需要使用Windows。安全密码学。果心MacAlgorithmNames。HmacSha256。此类显示在前面的类型中,但在编译时它不存在。 如何使用

  • 我们目前正在利用Azure服务总线来处理来自应用程序的各种消息。 我想知道实时处理这些消息的最佳方式是什么? 有没有一种方法可以在消息放入队列时自动执行脚本? 我只是在想,一定有比让一个单独的应用程序每分钟/30秒检查一次队列更好的方法。 谢谢各位

  • 我有一个windows服务,它侦听Azure服务总线队列消息,以便从我的WebApi应用程序分发处理。此外,我还需要处理重复性任务(每晚/每周),我认为最好使用相同的系统来处理这些任务。 例如,假设我有一个“CleanupDb”队列,每天午夜删除过时的DB节点: 理论上这应该行得通,但我觉得我错过了一个更明显的处理方法。有没有更好的办法?

  • 我已经创建了一个简单的窗口服务来使用来自Azure服务总线队列的消息。我使用TopShelch创建windows服务。下面的代码从这里剪切如下示例:https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues 高频。Run(); ServiceBusHe

  • 我有一个应用程序,在这个应用程序中,我可以在进程的一部分中以JSON格式将消息写入Azure服务总线队列。我有一个下游进程,我想将该消息从队列中弹出,将json转换为一个对象,然后处理该对象。 我没有问题将消息推送到队列上,但我还没有找到任何将消息从队列中逐一或循环弹出的示例。我在微软或Github上看到的每一个例子都是一个控制台应用程序(在网络应用程序中毫无用处),它设置了某种侦听器,可以抓取队