当前位置: 首页 > 知识库问答 >
问题:

大众交通-如何使用Azure服务总线安排消息

李开宇
2023-03-14

我已经查看了有关Azure Service Bus调度的文档,但我不清楚如何确切地从“断开连接的”总线发送消息。

以下是我如何配置在服务器上处理消息的服务:

builder.AddMassTransit(mt =>
{
    mt.AddConsumers(cqrsAssembly);

    mt.AddBus(context => Bus.Factory.CreateUsingAzureServiceBus(x =>
    {
        x.RequiresSession = true;
        x.MaxConcurrentCalls = 500;
        x.MessageWaitTimeout = TimeSpan.FromMinutes(5);
        x.UseRenewLock(TimeSpan.FromMinutes(4));
        x.UseServiceBusMessageScheduler();

        var host = x.Host(serviceUri, h =>
        {
            h.SharedAccessSignature(s =>
            {
                s.KeyName = "key-name";
                s.SharedAccessKey = "access-key";
                s.TokenTimeToLive = TimeSpan.FromDays(1);
                s.TokenScope = TokenScope.Namespace;
            });

            h.OperationTimeout = TimeSpan.FromMinutes(2);
        });

        x.ReceiveEndpoint(host, $"mt.myqueue", ep =>
        {
            ep.RequiresSession = true;
            ep.MaxConcurrentCalls = 500;
            ep.RemoveSubscriptions = true;

            ep.UseMessageRetry(r =>
            {
                r.Interval(4, TimeSpan.FromSeconds(30));
                r.Handle<TransientCommandException>();
            });

            ep.ConfigureConsumers(context);
        });
    });
});

我明确调用了UseServiceBusMessageScheduler()

在创建消息并将消息发送到队列的项目中(在不同的上下文中运行,因此执行“仅发送”),我们有以下内容:

var bus = Bus.Factory.CreateUsingAzureServiceBus(x =>
{
    x.RequiresSession = true;
    x.MessageWaitTimeout = TimeSpan.FromMinutes(5);
    x.UseRenewLock(TimeSpan.FromMinutes(4));
    x.Send<ICommand>(s => s.UseSessionIdFormatter(ctx => ctx.Message.SessionId ?? Guid.NewGuid().ToString()));


    var host = x.Host(serviceUri, h =>
    {
        h.SharedAccessSignature(s =>
        {
            s.KeyName = "key-name";
            s.SharedAccessKey = "key"; 
            s.TokenTimeToLive = TimeSpan.FromDays(1);
            s.TokenScope = TokenScope.Namespace;
        });
        h.OperationTimeout = TimeSpan.FromMinutes(2);
    });

    EndpointConvention.Map<ICommand>(new Uri($"{serviceUri.ToString()}mt.myqueue"));
    EndpointConvention.Map<Command>(new Uri($"{serviceUri.ToString()}mt.myqueue"));
});

现在,要发送预定消息,我们这样做:

var dest = "what?";
await bus.ScheduleSend(dest, scheduledEnqueueTimeUtc.Value, message);

我不确定需要向目标地址传递什么。

我尝试过:-<代码>服务URI-`{serviceUri}mt.myqueue“

但是检查队列时,我在基本队列、跳过队列或错误队列中都看不到我的消息

我是否缺少其他配置,如果没有,如何确定目标队列?

我使用的是轨道交通5.5.4版,每次超载到ScheduleSend()都需要它。

共有1个答案

南宫浩皛
2023-03-14

首先,是的,您的Uri格式是正确的。最后格式化后,您需要这样的东西:

new Uri(@"sb://yourdomain.servicebus.windows.net/yourapp/your_message_queue")

还要确保在配置endpoint时添加了。(参见下面的链接)

configurator.UseServiceBusMessageScheduler();

如果您遵循公共交通文档,则可以从ConsumeContext中完成计划。请参阅轨道交通Azure计划

public class ScheduleNotificationConsumer :
    IConsumer<AssignSeat>
{
    Uri _schedulerAddress;
    Uri _notificationService;

    public async Task Consume(ConsumeContext<AssignSeat> context)
    {
        if(context.Message.ReservationTime - DateTime.Now < TimeSpan.FromHours(8))
        {
            // assign the seat for the reservation
        }
        else
        {
            // seats can only be assigned eight hours before the reservation
            context.ScheduleMessage(context.Message.ReservationTime - TimeSpan.FromHours(8), context.Message);
        }
    }
}

然而,在我们本周面临的一个用例中,我们需要从consumeContext外部进行调度,或者只是不想将上下文转发到我们调度的位置。当使用IBusControl. schdule发送时,我们没有收到错误反馈,但我们也没有真正完成任何调度。在查看了大众运输的作用之后,它发现从IBusControl中,它创建了一个新的调度提供程序。而从上下文中,它使用ServiceBusschduleMessageProvider。

因此,在清理完这一部分之前,我们现在要做的是直接调用ServiceBusScheduleMessageProvider。

        await new ServiceBusScheduleMessageProvider(_busControl).ScheduleSend(destinationUri
            , scheduleDateTime.UtcDateTime
            , Task.FromResult<T>(message)
            , Pipe.Empty<SendContext>()
            , default);

希望它有意义并能有所帮助。

 类似资料:
  • 我在Azure Service Bus中使用代理消息传递(主题/订阅),我很好奇如何(或者是否)使用SSL保护通信。

  • 我在Azure中托管了两个云服务辅助角色,一个使用NServiceBus(Azure服务总线传输)消耗消息,另一个生成消息。 昨天,我部署了一个新版本的生产者工作者角色,而队列中仍然有大量消息,因为我们正在处理早上遗留下来的大量消息。当生产者启动时,它似乎已经清空(或者可能重新创建)队列,许多重要的生产消息丢失。这似乎很奇怪,但日志显示,大约在生产者角色启动时,消费者没有处理进一步的消息,我们知道

  • 我正在尝试在Azure中构建一个简单的WebAPI REST服务,后端有一个服务总线队列工作器。我可以从Web API向工作人员发送一条消息。然而,我试图发送更多的信息,只是为了看看一切是如何运作的。因此,我创建了一个简单的控制器,如下所示: 当我呼叫控制器时,我只收到工作人员接收到的大约1/2的消息。其余的似乎都被放弃了。

  • 我知道Azure Service Bus有一个重复消息检测功能,可以删除它认为与其他消息重复的消息。我想使用此功能来帮助防止重复传递。 我好奇的是服务如何确定两条消息实际上是重复的: 考虑了消息的哪些属性 是否考虑了消息的内容 如果我发送两条内容相同但消息属性不同的消息,它们是否被视为重复

  • 我正在为Azure服务总线使用最新的Java绑定(V3.1.3):https://github.com/Azure/azure-sdk-for-java/tree/master/sdk/servicebus 当我创建一个新的队列客户端,计划一条消息,然后取消它... ...代码似乎按预期工作:活动消息计数变为0。但是一旦被调度的消息到达它应该被调度的时间(我测试了10秒和100秒以后),消息有时会