当前位置: 首页 > 知识库问答 >
问题:

如何主动处理Azure服务总线队列消息

戎鹏云
2023-03-14

我们目前正在利用Azure服务总线来处理来自应用程序的各种消息。

我想知道实时处理这些消息的最佳方式是什么?

有没有一种方法可以在消息放入队列时自动执行脚本?

我只是在想,一定有比让一个单独的应用程序每分钟/30秒检查一次队列更好的方法。

谢谢各位

共有2个答案

缪升
2023-03-14

当涉及到基础架构代码时,我宁愿不写任何代码。毕竟,您最不希望看到的是基础架构代码中的一个bug,它会导致数据/消息丢失。

使用裸骨Azure服务总线的另一种选择是使用库为您提取所有代码。最终,您将声明您的消息—您的事件和命令—并且您拥有当出现消息时将被触发的处理程序。所有的消息泵送、队列创建、重试、错误处理、审计和事务(这些只是冰山一角)都有这样的框架。

至于使用哪种框架,有Nimbus和NServiceBus,可能还有其他。NServiceBus是一款商业产品,它附带了大量文档、devops、调试和可视化实用程序,以及您需要的额外付费支持。以下是如何使用Azure ServiceBus启动并运行NServiceBusendpoint:

var endpointConfiguration = new EndpointConfiguration("Endpoint1");
endpointConfiguration.SendFailedMessagesTo("error");

var transport = endpointConfiguration.UseTransport<AzureServiceBusTransport>();
var connectionString = Environment.GetEnvironmentVariable("AzureServiceBus.ConnectionString");

transport.ConnectionString(connectionString);
transport.UseTopology<ForwardingTopology>();

var endpointInstance = await Endpoint.Start(endpointConfiguration)
        .ConfigureAwait(false);

var message = new Message1
{
    Property = "Hello from Endpoint1"
};

await endpointInstance.Send(message).ConfigureAwait(false);

在接收方,您只需拥有一个处理程序类:

public class MyMessageHandler : IHandleMessages<Message1>
{
    public Task Handle(Message2 message, IMessageHandlerContext context)
    {
        //Do your task
        Console.WriteLine(message.Property);
        return Task.CompletedTask;
    }
}

另外,我为NServiceBus的特定软件制造商工作,但我使用了两种推荐的框架。你需要决定哪一个适合你。

相云
2023-03-14

您不需要根据计时器不断检查总线。

服务总线主题和订阅支持发布/订阅消息传递通信模型。

将消息发送到主题时,每个订阅都可以独立处理/处理该消息。

下面是一个C#示例,介绍如何从主题接收消息:

string connectionString =
    CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");

SubscriptionClient Client =
    SubscriptionClient.CreateFromConnectionString
            (connectionString, "TestTopic", "HighMessages");

// Configure the callback options.
OnMessageOptions options = new OnMessageOptions();
options.AutoComplete = false;
options.AutoRenewTimeout = TimeSpan.FromMinutes(1);

Client.OnMessage((message) =>
{
    try
    {
        // Process message from subscription.
        Console.WriteLine("\n**High Messages**");
        Console.WriteLine("Body: " + message.GetBody<string>());
        Console.WriteLine("MessageID: " + message.MessageId);
        Console.WriteLine("Message Number: " +
            message.Properties["MessageNumber"]);

        // Remove message from subscription.
        message.Complete();
    }
    catch (Exception)
    {
        // Indicates a problem, unlock message in subscription.
        message.Abandon();
    }
}, options);

以下是有关发布服务器订阅服务器型号的更多详细信息:

https://azure.microsoft.com/en-us/documentation/articles/service-bus-dotnet-how-to-use-topics-subscriptions/

 类似资料:
  • 我对Azure服务总线队列还比较陌生,正在构建一个项目,该项目需要按照消息到达的顺序(FIFO)处理队列中的消息。 使用微软的文档,我能够理解这一部分。据我所知,我需要为队列打开会话? 我所面临的困难是确定对队列执行以下一组有序任务的最佳方法/服务堆栈。 首先,让我们假设我们有一个基于先进先出的服务总线队列,它有n个消息。我怎么可能: 从队列中拾取第一条消息。 使用Azure函数处理消息。 将有效

  • 我有一个windows服务,它侦听Azure服务总线队列消息,以便从我的WebApi应用程序分发处理。此外,我还需要处理重复性任务(每晚/每周),我认为最好使用相同的系统来处理这些任务。 例如,假设我有一个“CleanupDb”队列,每天午夜删除过时的DB节点: 理论上这应该行得通,但我觉得我错过了一个更明显的处理方法。有没有更好的办法?

  • 我有一个WCF服务,它连接到一个服务总线队列,准备接收消息。这是工作很好,但我希望能够标记的消息作为一个死信,如果我有一个问题处理的消息。当前,如果我的代码抛出异常,消息仍然会从队列中删除,但我希望能够在配置中指定不从队列中删除,但将其标记为死信。我做了一些搜索,但我不知道怎么做。我当前正在将该服务作为windows服务运行 Uri baseAddress=ServiceBusEnvironmen

  • 我有一个应用程序,在这个应用程序中,我可以在进程的一部分中以JSON格式将消息写入Azure服务总线队列。我有一个下游进程,我想将该消息从队列中弹出,将json转换为一个对象,然后处理该对象。 我没有问题将消息推送到队列上,但我还没有找到任何将消息从队列中逐一或循环弹出的示例。我在微软或Github上看到的每一个例子都是一个控制台应用程序(在网络应用程序中毫无用处),它设置了某种侦听器,可以抓取队

  • 如何轮询azure服务总线以持续检查消息?下面是我从队列接收消息的方式。 我想不断地寻找信息,然后处理它。

  • 问题是一旦解决了异常的来源,如何处理错误。一旦消息出错并最终出现在_error队列中,我希望在消息和/或服务修复后将消息移回处理。我无法将消息从_error队列移动到主题,因为该主题上的每个服务都将再次获得该消息。 我试图使用ReceiveEndpoint方法创建第二个队列,该方法名为_errorRecovery,但这样做会导致队列订阅主题,这意味着_errorRecovery队列获取发布到该主题