当前位置: 首页 > 知识库问答 >
问题:

Azure服务总线功能。批量接收消息

秦建元
2023-03-14

我希望以批处理模式接收来自Azure ServiceBus主题的消息。

阅读https://docs.microsoft.com/en-us/Azure/Azure-functions/functions-best-practices时指出:

我有一个方法:

public static void Run([ServiceBusTrigger("mytopic name", "MySubscription",
AccessRights.Listen, Connection = TopicService.ConnectionStringName)]
string messages, TraceWriter logger)

这个方法是有效的,但它一次只需要一个消息。

根据Microsoft文档,我可以将其更改为:

public static void Run([ServiceBusTrigger("mytopic name", "MySubscription",
AccessRights.Listen, Connection = TopicService.ConnectionStringName)]
string[] messages, TraceWriter logger)
{
    "aggregator": {
        "batchSize": 10,
        "flushTimeout": "00:00:30"
    }
}

注意:主题和订阅已启用“启用批处理操作”设置。

我错过了什么?

共有1个答案

东门令
2023-03-14

这是我尝试的代码。检查一下,看看是否管用。

//---------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.  

using Microsoft.Azure;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ServiceBusTriggers
{
    class Settings
    {
        public const string TopicPath = "sbperftopicwithpartitions";
        public const string Subscription = "sub_1";
        public const string ContainerName = "sbperf-test-store2";

        internal Settings()
        {
            ServiceBusConnectionString = GetSetting("ServiceBusConnectionString", UserSettings.ServiceBusConnectionString);
            StorageAccountConnectionString = GetSetting("StorageAccountConnectionString", UserSettings.StorageAccountConnectionString);
            AzureWebJobsDashboardConnectionString = GetSetting("AzureWebJobsDashboardConnectionString", UserSettings.AzureWebJobsDashboardConnectionString);
            AzureWebJobsStorageConnectionString = GetSetting("AzureWebJobsStorageConnectionString", UserSettings.AzureWebJobsStorageConnectionString);

            NLogDatabaseConnectionString = GetSetting("NLogDatabaseConnectionString", UserSettings.NLogDatabaseConnectionString);

            PrefetchCount = GetSetting("PrefetchCount", 100);
            MaxConcurrentCalls = GetSetting("MaxConcurrentCalls",100);

            MetricsDisplayFrequency = new TimeSpan(0, 0, 30); //every 30 seconds
            TokenSource = new CancellationTokenSource();
        }

        private int GetSetting(string name, int defaultValue)
        {
            int value;
            string valueStr = CloudConfigurationManager.GetSetting(name);
            if (!int.TryParse(valueStr, out value))
            {
                Console.WriteLine("Config missing for {0}. Using default.",name);
                value = defaultValue;
            }
            return value;
        }

        private string GetSetting(string name, string defaultValue)
        {
            string valueStr = CloudConfigurationManager.GetSetting(name);
            if (string.IsNullOrEmpty(valueStr))
            {
                Console.WriteLine("Config missing for {0}. Using default.", name);
                valueStr = defaultValue;
            }
            return valueStr;
        }

        public string ServiceBusConnectionString { get; set; }

        public string StorageAccountConnectionString { get; set;  }
       
        public int PrefetchCount { get; set; }

        public int MaxConcurrentCalls { get; set; }

        public TimeSpan MetricsDisplayFrequency { get; internal set; }

        public CancellationTokenSource TokenSource { get; set; }

        public string NLogDatabaseConnectionString { get; private set; }

        public static string AzureWebJobsDashboardConnectionString { get; internal set; }

        public static string AzureWebJobsStorageConnectionString { get; internal set; }

        public void WriteSettings()
        {
            ProjectLogger.Info("1|None|{1}|DisplayFrequency|{0}|", MetricsDisplayFrequency, Thread.CurrentThread.ManagedThreadId);
            ProjectLogger.Info("1|None|{1}|PrefetchCount|{0}|", PrefetchCount, Thread.CurrentThread.ManagedThreadId);
            ProjectLogger.Info("1|None|{1}|MaxConcurrentCalls|{0}|", MaxConcurrentCalls, Thread.CurrentThread.ManagedThreadId);
        }

    }
}
 类似资料:
  • 参考https://github.com/Azure/azure-service-bus/tree/master/samples/dotnet/gettingstart/microsoft.Azure.servicebus/basicsendreceiveusingtopicsubscriptionclient,我了解Azure服务总线主题的一般工作方式,我的问题更多地是关于它实际上是如何工作的。

  • 根据MS文档,从订阅接收消息并不困难。但是,如果我希望我的应用程序在每次发布新消息时都接收一条消息--一个恒定的轮询。因此,使用了SubscriptionClient类的OnMessage()方法。 MS文档说:“...当调用OnMessage时,客户端启动一个内部消息泵,该消息泵不断轮询队列或订阅。该消息泵由发出Receive()调用的无限循环组成。如果调用超时,它发出下一个Receive()调

  • 我有一个超时选项,只想在超时前接收消息。 如果您能解释下面的代码是如何工作的,以及我如何修改下面的代码以在特定的时间框架内接收消息,并且一旦我的超时已经到达就停止接收,这将是很有帮助的。

  • 这是我能找到的最接近的前一个问题:Azure Service Bus Subscription OnMessage未接收消息。 同样的事情也发生在我身上。当我改变主题的名称时,它会再次工作一段时间。则该服务总线主题再次损坏。只有65-71%的消息到达。无助于删除子内容,也无助于删除主题。题名似乎过了一段时间不知怎么就被污染了。这是真的真的很糟糕,因为我没有办法告诉什么时候主题是腐败的,除了系统不像

  • 我正在使用Azure服务总线主题机制。此外,我已经将消息发送到主题,并希望通过编程方式检查是否将消息发送到主题。 代码: 有没有办法获得响应或状态代码?

  • 调试此代码时,应用程序停止。未显示任何错误消息或异常。 项目X: 下面我将通过REST API发布JSON对象。 项目Y: 有办法找到尸体吗?