我希望以批处理模式接收来自Azure ServiceBus主题的消息。
阅读https://docs.microsoft.com/en-us/Azure/Azure-functions/functions-best-practices时指出:
我有一个方法:
public static void Run([ServiceBusTrigger("mytopic name", "MySubscription",
AccessRights.Listen, Connection = TopicService.ConnectionStringName)]
string messages, TraceWriter logger)
这个方法是有效的,但它一次只需要一个消息。
根据Microsoft文档,我可以将其更改为:
public static void Run([ServiceBusTrigger("mytopic name", "MySubscription",
AccessRights.Listen, Connection = TopicService.ConnectionStringName)]
string[] messages, TraceWriter logger)
{
"aggregator": {
"batchSize": 10,
"flushTimeout": "00:00:30"
}
}
注意:主题和订阅已启用“启用批处理操作”设置。
我错过了什么?
这是我尝试的代码。检查一下,看看是否管用。
//---------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
using Microsoft.Azure;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ServiceBusTriggers
{
class Settings
{
public const string TopicPath = "sbperftopicwithpartitions";
public const string Subscription = "sub_1";
public const string ContainerName = "sbperf-test-store2";
internal Settings()
{
ServiceBusConnectionString = GetSetting("ServiceBusConnectionString", UserSettings.ServiceBusConnectionString);
StorageAccountConnectionString = GetSetting("StorageAccountConnectionString", UserSettings.StorageAccountConnectionString);
AzureWebJobsDashboardConnectionString = GetSetting("AzureWebJobsDashboardConnectionString", UserSettings.AzureWebJobsDashboardConnectionString);
AzureWebJobsStorageConnectionString = GetSetting("AzureWebJobsStorageConnectionString", UserSettings.AzureWebJobsStorageConnectionString);
NLogDatabaseConnectionString = GetSetting("NLogDatabaseConnectionString", UserSettings.NLogDatabaseConnectionString);
PrefetchCount = GetSetting("PrefetchCount", 100);
MaxConcurrentCalls = GetSetting("MaxConcurrentCalls",100);
MetricsDisplayFrequency = new TimeSpan(0, 0, 30); //every 30 seconds
TokenSource = new CancellationTokenSource();
}
private int GetSetting(string name, int defaultValue)
{
int value;
string valueStr = CloudConfigurationManager.GetSetting(name);
if (!int.TryParse(valueStr, out value))
{
Console.WriteLine("Config missing for {0}. Using default.",name);
value = defaultValue;
}
return value;
}
private string GetSetting(string name, string defaultValue)
{
string valueStr = CloudConfigurationManager.GetSetting(name);
if (string.IsNullOrEmpty(valueStr))
{
Console.WriteLine("Config missing for {0}. Using default.", name);
valueStr = defaultValue;
}
return valueStr;
}
public string ServiceBusConnectionString { get; set; }
public string StorageAccountConnectionString { get; set; }
public int PrefetchCount { get; set; }
public int MaxConcurrentCalls { get; set; }
public TimeSpan MetricsDisplayFrequency { get; internal set; }
public CancellationTokenSource TokenSource { get; set; }
public string NLogDatabaseConnectionString { get; private set; }
public static string AzureWebJobsDashboardConnectionString { get; internal set; }
public static string AzureWebJobsStorageConnectionString { get; internal set; }
public void WriteSettings()
{
ProjectLogger.Info("1|None|{1}|DisplayFrequency|{0}|", MetricsDisplayFrequency, Thread.CurrentThread.ManagedThreadId);
ProjectLogger.Info("1|None|{1}|PrefetchCount|{0}|", PrefetchCount, Thread.CurrentThread.ManagedThreadId);
ProjectLogger.Info("1|None|{1}|MaxConcurrentCalls|{0}|", MaxConcurrentCalls, Thread.CurrentThread.ManagedThreadId);
}
}
}
参考https://github.com/Azure/azure-service-bus/tree/master/samples/dotnet/gettingstart/microsoft.Azure.servicebus/basicsendreceiveusingtopicsubscriptionclient,我了解Azure服务总线主题的一般工作方式,我的问题更多地是关于它实际上是如何工作的。
根据MS文档,从订阅接收消息并不困难。但是,如果我希望我的应用程序在每次发布新消息时都接收一条消息--一个恒定的轮询。因此,使用了SubscriptionClient类的OnMessage()方法。 MS文档说:“...当调用OnMessage时,客户端启动一个内部消息泵,该消息泵不断轮询队列或订阅。该消息泵由发出Receive()调用的无限循环组成。如果调用超时,它发出下一个Receive()调
我有一个超时选项,只想在超时前接收消息。 如果您能解释下面的代码是如何工作的,以及我如何修改下面的代码以在特定的时间框架内接收消息,并且一旦我的超时已经到达就停止接收,这将是很有帮助的。
这是我能找到的最接近的前一个问题:Azure Service Bus Subscription OnMessage未接收消息。 同样的事情也发生在我身上。当我改变主题的名称时,它会再次工作一段时间。则该服务总线主题再次损坏。只有65-71%的消息到达。无助于删除子内容,也无助于删除主题。题名似乎过了一段时间不知怎么就被污染了。这是真的真的很糟糕,因为我没有办法告诉什么时候主题是腐败的,除了系统不像
我正在使用Azure服务总线主题机制。此外,我已经将消息发送到主题,并希望通过编程方式检查是否将消息发送到主题。 代码: 有没有办法获得响应或状态代码?
调试此代码时,应用程序停止。未显示任何错误消息或异常。 项目X: 下面我将通过REST API发布JSON对象。 项目Y: 有办法找到尸体吗?