我已经创建了一个简单的窗口服务来使用来自Azure服务总线队列的消息。我使用TopShelch创建windows服务。下面的代码从这里剪切如下示例:https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues
var hf = HostFactory.New(x =>
{
x.Service<ServiceBusHelper>(s =>
{
s.ConstructUsing(serviceProvider.GetService<ServiceBusHelper>);
s.WhenStarted(async service => await service.ReceiveMessagesAsync());
s.WhenStopped(async service => await service.Stop());
});
x.RunAsNetworkService()
.StartAutomatically()
.EnableServiceRecovery(rc => rc.RestartService(1));
x.SetServiceName("MyWindowsService");
x.SetDisplayName("MyWindowsService");
x.SetDescription("MyWindowsService");
});
高频。Run();
ServiceBusHelper类:
public async Task ReceiveMessagesAsync()
{
var connectionString = _configuration.GetValue<string>("ServiceBusConnectionString");
var queueName = _configuration.GetValue<string>("ServiceBusQueueName");
await using (ServiceBusClient client = new ServiceBusClient(connectionString))
{
ServiceBusProcessor processor = client.CreateProcessor(queueName, new ServiceBusProcessorOptions());
processor.ProcessMessageAsync += MessageHandler;
processor.ProcessErrorAsync += ErrorHandler;
await processor.StartProcessingAsync();
System.Threading.Thread.Sleep(1000);//Wait for a minute before stop processing
await processor.StopProcessingAsync();
}
}
public async Task MessageHandler(ProcessMessageEventArgs args)
{
string body = args.Message.Body.ToString();
var messageBytes = Encoding.ASCII.GetBytes(body);
ProcessMessage(messageBytes);
await args.CompleteMessageAsync(args.Message);
}
public Task ErrorHandler(ProcessErrorEventArgs args)
{
return Task.CompletedTask;
}
public Task Stop()
{
return Task.CompletedTask;
}
Windows服务已成功安装,状态显示为正在运行。但是,它不会自动使用来自服务总线的消息。如果我手动停止并启动服务,它将从队列中拾取消息。不确定这个实现缺少了什么。如有任何建议,我们将不胜感激。
...NetCore 3.1引入了一个新的扩展,与微软一起工作。AspNetCore.托管添加Nuget包Microsoft.扩展。主机。您可以添加WindowsServices。这将允许您将其作为windows服务或控制台应用程序运行。
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.UseWindowsService()
.ConfigureAppConfiguration((context, config) =>
{
// configure the app here.
})
.ConfigureServices((hostContext, services) =>
{
services.AddHostedService<QueueWorker>();
}).UseSerilog();
}
然后,您可以创建后台工作程序来启动和停止处理servicebus队列。以下是我的实现:
public class QueueWorker : BackgroundService, IDisposable
{
protected ILogger<QueueWorker> _logger;
protected IQueueMessageReceiver _queueProcessor;
public QueueWorker()
{
}
public QueueWorker(ILogger<QueueWorker> logger, IQueueMessageReceiver queueMessageReceiver)
{
_logger = logger;
_queueProcessor = queueMessageReceiver;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await Task.CompletedTask.ConfigureAwait(false);
}
public override Task StartAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Service Starting");
var task = _queueProcessor.StartProcessor(cancellationToken);
task.Wait();
if (task.IsFaulted)
{
throw new Exception("Unable to start Processor");
}
return base.StartAsync(cancellationToken);
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Stopping Service");
await _queueProcessor.StopProcessor().ConfigureAwait(false);
await base.StopAsync(cancellationToken).ConfigureAwait(false);
}
public override void Dispose()
{
_logger.LogInformation("Disposing Service");
var loopCount = 0;
while (_queueProcessor != null && !_queueProcessor.IsClosedOrClosing() && loopCount < 5)
{
var task = Task.Delay(600);
task.Wait();
loopCount++;
}
base.Dispose();
GC.SuppressFinalize(this);
}
和实际的处理器:
public class QueueMessageReceiver : IQueueMessageReceiver
{
private readonly ServiceBusClient _queueClient;
private ServiceBusProcessor _processor;
private readonly ReceiverConfiguration _configuration;
private readonly ILogger _logger;
private readonly ILoggerFactory _loggerFactory;
private Dictionary<string, string> _executionMatrix;
private readonly IServiceProvider _provider;
private CancellationToken _cancellationToken;
public QueueMessageReceiver(ReceiverConfiguration configuration, ILogger<QueueMessageReceiver> logger, IExecutionMatrix executionMatrix, ILoggerFactory loggerFactory, IServiceProvider serviceProvider)
{
if (configuration == null) throw new ArgumentException($"Configuration is missing from the expected ");
_configuration = configuration;
_logger = logger;
_loggerFactory = loggerFactory;
_executionMatrix = executionMatrix.GetExecutionMatrix();
_provider = serviceProvider;
_queueClient = new ServiceBusClient(_configuration.ConnectionString);
if (string.IsNullOrWhiteSpace(configuration.ConnectionString)) throw new ArgumentException($"ServiceBusConnectionString Object missing from the expected configuration under ConnectionStrings ");
if (configuration.QueueName == null) throw new ArgumentException($"Queue Name value missing from the expected configuration");
}
public async Task StartProcessor(CancellationToken cancellationToken)
{
if (!IsClosedOrClosing())
{
throw new FatalSystemException("ServiceBusProcessor is already running. ");
}
_cancellationToken = cancellationToken;
var options = new ServiceBusProcessorOptions
{
AutoCompleteMessages = _configuration.AutoComplete,
MaxConcurrentCalls = _configuration.MaxConcurrentCalls,
MaxAutoLockRenewalDuration = _configuration.MaxAutoRenewDuration
};
_processor = _queueClient.CreateProcessor(_configuration.QueueName, options);
_processor.ProcessMessageAsync += ProcessMessagesAsync;
_processor.ProcessErrorAsync += ProcessErrorAsync;
await _processor.StartProcessingAsync().ConfigureAwait(false);
}
public async Task StopProcessor()
{
await _processor.StopProcessingAsync();
await _processor.CloseAsync();
}
private Task ProcessErrorAsync(ProcessErrorEventArgs args)
{
_logger.LogError(args.Exception, "Uncaught handled exception", args.ErrorSource, args.FullyQualifiedNamespace, args.EntityPath);
return Task.CompletedTask;
}
private async Task ProcessMessagesAsync(ProcessMessageEventArgs args)
{
var message = args.Message;
// Process the message.
var sbMessage = $"Received message: SequenceNumber:{message.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}";
_logger.LogInformation(sbMessage);
//Handle your message
}
public bool IsClosedOrClosing()
{
return ((_processor == null) || _processor.IsClosed || !_processor.IsProcessing);
}
}
我有一个windows服务,它侦听Azure服务总线队列消息,以便从我的WebApi应用程序分发处理。此外,我还需要处理重复性任务(每晚/每周),我认为最好使用相同的系统来处理这些任务。 例如,假设我有一个“CleanupDb”队列,每天午夜删除过时的DB节点: 理论上这应该行得通,但我觉得我错过了一个更明显的处理方法。有没有更好的办法?
我在同一命名空间中有2个Azure Service Bus队列。我使用ScheduledQueueTimeUTC将计划消息放置在其中一个队列(“计划队列”)中,并将要立即处理的消息放置在另一个队列(“现在队列”)中。 我想做的是设置“预定队列”的自动转发属性,以便在消息到达它们的“预定队列”时将消息转发到“现在队列”。队列转发会这样工作吗?还是队列自动转发会立即发送已排队和未排队的消息? 我在任何
我正在使用azure服务总线主题和订阅机制,并希望处理所有在死信队列中的消息。 此外,我想通过C#中的Azure Web作业处理消息,并将其发送回队列。所以我想知道如何通过我的应用程序处理死信队列上的消息?
我有一个服务总线Q,从Azure门户可以或多或少地看到服务总线Q包含多少条目。如何使用他们的管理API获取此计数?我仔细阅读了文档,但没有找到答案。
我正在尝试创建以下进程: 为了连接两端(IoT Hub和Notification Hub),我尝试按照以下教程操作:https://www.developer.com/ws/android/senging-notifications-to-mobile-apps-fromon-azure-function-apps.html 此外,我还添加了从IoT集线器到服务总线中适当队列的路由。 现在,每当I
我正在使用Azure服务总线队列。但是我不能使用“获取所有队列消息(peek Lock):微软内置于api”从队列中获取所有消息。 有没有办法获取所有队列消息? {"$连接":{"值":{"servicebus_1":{"连接ID":"/订阅/c776fex3-6aec-4722-b099-b054c267b240/资源组/Plugin-Resources/提供者/Microsoft.网络/连接/