当前位置: 首页 > 知识库问答 >
问题:

Azure函数触发器Azure服务总线会话启用不反序列化MessageReceiver

江衡
2023-03-14

根据https://docs.microsoft.com/en-us/Azure/Azure-functions/functions-bindings-service-bus-trigger?tabs=CSharp#usage(emphasis mine)的文档,messageReceiver是允许的参数类型之一:

下列参数类型可用于队列或主题消息:

  • 字符串-如果消息是文本。
  • 字节[]-用于二进制数据。
  • 定义类型-如果消息包含JSON,Azure Functions将尝试反序列化JSON数据。
  • BrokeredMessage-使用BrokeredMessage.getBody()方法向您提供反序列化的消息。
  • MessageReceiver-用于接收和确认来自消息容器的消息(当自动完成设置为false时是必需的)
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;

namespace DataUpdateNotification.AzureFunctions
{
    public static class PersonIdFunction
    {
        [FunctionName("PersonIdFunction")]
        public static void Run([ServiceBusTrigger("personid", Connection = "AzureWebJobsServiceBus", IsSessionsEnabled = true)]Message myQueueItem, ILogger log)
        {
            log.LogInformation($"C# ServiceBus queue trigger function processed message: {myQueueItem}");
        }
    }
}
using Microsoft.Azure.ServiceBus.Core;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;

namespace DataUpdateNotification.AzureFunctions
{
    public static class PersonIdFunction
    {
        [FunctionName("PersonIdFunction")]
        public static void Run([ServiceBusTrigger("personid", Connection = "AzureWebJobsServiceBus", IsSessionsEnabled = true)]MessageReceiver myQueueItem, ILogger log)
        {
            log.LogInformation($"C# ServiceBus queue trigger function processed message: {myQueueItem}");
        }
    }
}
<Project Sdk="Microsoft.NET.Sdk">
  <PropertyGroup>
    <TargetFramework>netcoreapp3.1</TargetFramework>
    <AzureFunctionsVersion>v3</AzureFunctionsVersion>
  </PropertyGroup>
  <ItemGroup>
    <PackageReference Include="Microsoft.Azure.WebJobs.Extensions.ServiceBus" Version="4.2.0" />
    <PackageReference Include="Microsoft.NET.Sdk.Functions" Version="3.0.11" />
  </ItemGroup>
  <ItemGroup>
    <None Update="host.json">
      <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
    </None>
    <None Update="local.settings.json">
      <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
      <CopyToPublishDirectory>Never</CopyToPublishDirectory>
    </None>
  </ItemGroup>
</Project>
{
    "version": "2.0",
    "logging": {
        "applicationInsights": {
            "samplingExcludedTypes": "Request",
            "samplingSettings": {
                "isEnabled": true
            }
        }
    }
}

local.settings.json

{
    "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "UseDevelopmentStorage=true",
    "FUNCTIONS_WORKER_RUNTIME": "dotnet",
    "AzureWebJobsServiceBus": "Endpoint=sb://redacted.servicebus.windows.net/;SharedAccessKeyName=RedactedAzureFunctionKey;SharedAccessKey=UkVEQUNURUQgQkVOIFdVWiBIRVJFIFJFREFDVEVE"
  }
}

完整的错误消息:

Azure Functions Core Tools
Core Tools Version:       3.0.2996 Commit hash: c54cdc36323e9543ba11fb61dd107616e9022bba
Function Runtime Version: 3.0.14916.0


Functions:

        PersonIdFunction: serviceBusTrigger

For detailed output, run func with --verbose flag.
[2020-12-08T14:00:26.451Z] Executing 'PersonIdFunction' (Reason='(null)', Id=51217a44-b2b0-4629-91d5-3035ece95155)
[2020-12-08T14:00:26.451Z] Executing 'PersonIdFunction' (Reason='(null)', Id=763a7222-277f-4fd3-8fcf-36042523b924)
[2020-12-08T14:00:26.454Z] Trigger Details: MessageId: d6d4b0895632465183f1c6aa8b84cb6f, SequenceNumber: 18, DeliveryCount: 1, EnqueuedTimeUtc: 2020-12-08T14:00:11.7240000Z, LockedUntilUtc: 9999-12-31T23:59:59.9999999Z, SessionId: 753
[2020-12-08T14:00:26.455Z] Trigger Details: MessageId: b21b8df0452e4df0bac8f67a058a5931, SequenceNumber: 17, DeliveryCount: 1, EnqueuedTimeUtc: 2020-12-08T14:00:11.7240000Z, LockedUntilUtc: 9999-12-31T23:59:59.9999999Z, SessionId: 159
[2020-12-08T14:00:27.060Z] Executed 'PersonIdFunction' (Failed, Id=51217a44-b2b0-4629-91d5-3035ece95155, Duration=705ms)[2020-12-08T14:00:27.060Z] Executed 'PersonIdFunction' (Failed, Id=763a7222-277f-4fd3-8fcf-36042523b924, Duration=705ms)[2020-12-08T14:00:27.062Z] System.Private.CoreLib: Exception while executing function: PersonIdFunction. Microsoft.Azure.WebJobs.Host: Exception binding parameter 'myQueueItem'. System.Private.DataContractSerialization: Type 'Microsoft.Azure.ServiceBus.Core.MessageReceiver' cannot be serialized. Consider marking it with the DataContractAttribute attribute, and marking all of its members you want serialized with the DataMemberAttribute attribute. Alternatively, you can ensure that the type is public and has a parameterless constructor - all public members of the type will then be serialized, and no attributes will be required.
[2020-12-08T14:00:27.064Z] System.Private.CoreLib: Exception while executing function: PersonIdFunction. Microsoft.Azure.WebJobs.Host: Exception binding parameter 'myQueueItem'. System.Private.DataContractSerialization: Type 'Microsoft.Azure.ServiceBus.Core.MessageReceiver' cannot be serialized. Consider marking it with the DataContractAttribute attribute, and marking all of its members you want serialized with the DataMemberAttribute attribute. Alternatively, you can ensure that the type is public and has a parameterless constructor - all public members of the type will then be serialized, and no attributes will be required.
[2020-12-08T14:00:27.102Z] Message processing error (Action=UserCallback, ClientId=QueueClient1personid, EntityPath=personid, Endpoint=redacted.servicebus.windows.net)
[2020-12-08T14:00:27.102Z] Message processing error (Action=UserCallback, ClientId=QueueClient1personid, EntityPath=personid, Endpoint=redacted.servicebus.windows.net)
[2020-12-08T14:00:27.105Z] System.Private.CoreLib: Exception while executing function: PersonIdFunction. Microsoft.Azure.WebJobs.Host: Exception binding parameter 'myQueueItem'. System.Private.DataContractSerialization: Type 'Microsoft.Azure.ServiceBus.Core.MessageReceiver' cannot be serialized. Consider marking it with the DataContractAttribute attribute, and marking all of its members you want serialized with the DataMemberAttribute attribute. Alternatively, you can ensure that the type is public and has a parameterless constructor - all public members of the type will then be serialized, and no attributes will be required.
[2020-12-08T14:00:27.111Z] System.Private.CoreLib: Exception while executing function: PersonIdFunction. Microsoft.Azure.WebJobs.Host: Exception binding parameter 'myQueueItem'. System.Private.DataContractSerialization: Type 'Microsoft.Azure.ServiceBus.Core.MessageReceiver' cannot be serialized. Consider marking it with the DataContractAttribute attribute, and marking all of its members you want serialized with the DataMemberAttribute attribute. Alternatively, you can ensure that the type is public and has a parameterless constructor - all public members of the type will then be serialized, and no attributes will be required.

共有1个答案

孟征
2023-03-14

但当我尝试使用Microsoft.Azure.ServiceBus.core.MessageReceiver作为第一个参数时,它会引发异常绑定参数“My QueueItem”。System.Private.DataContractSerialization:无法序列化类型“Microsoft.Azure.ServiceBus.Core.MessageReceiver”。:

你可以用它作为参数,这是没有问题的。但不能在触发器属性之后使用它。而且,非常重要的一点是,MessageReceiver.CloseSync()方法不能关闭会话(也不能关闭队列客户端。就是不工作。函数似乎没有对此进行逻辑处理。)启用会话时。您甚至不必创建closeasync()方法来关闭会话。关闭会话操作由MessageWaitTimeout管理。函数将一直等到它到时间为止。

我有一个系统,定期向启用会话的Azure服务总线队列(不是主题或订阅)发送消息。发件人总是将SessionId设置为允许我将类似的邮件分组在一起。我想使用MessageReceiver对象(这是文档promise的)立即调用MessageReceiver.CloseSync()来删除/完成与该会话相关的所有消息。总体目标是执行一个SELECT DISTINCT,这样我就可以在一次调用中处理/删除/完成所有相关的消息。

using System;
using System.Text;
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.Core;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Logging;

namespace FunctionApp45
{
    public static class Function1
    {
        [FunctionName("Function1")]
        public static async System.Threading.Tasks.Task RunAsync([ServiceBusTrigger("myqueue", Connection = "str",IsSessionsEnabled =true)] Message myQueueItem,    MessageReceiver messageReceiver, ILogger log)
        {
            log.LogInformation($"C# ServiceBus queue trigger function processed message: {Encoding.ASCII.GetString(myQueueItem.Body)}");

            //Some logic here.

            await messageReceiver.CompleteAsync(myQueueItem.SystemProperties.LockToken);
        }
    }
}
{
    "version": "2.0",
    "extensions": {
        "serviceBus": {
            "prefetchCount": 100,
            "messageHandlerOptions": {
                "autoComplete": false,
                "maxConcurrentCalls": 32,
                "maxAutoRenewDuration": "00:05:00"
            },
            "sessionHandlerOptions": {
                "autoComplete": false,
                "messageWaitTimeout": "00:00:30",
                "maxAutoRenewDuration": "00:55:00",
                "maxConcurrentSessions": 1
            }
        }
    }
}
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.Core;
using Newtonsoft.Json;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp17
{
    public class Program 
    {
        string connectionString = "Endpoint=sb://bowman1012.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=X/NHgQ4AQxul6YlMsUofD+JNE20Tovnzx3g2gDt8qyY=";
        string SessionQueueName = "myqueue";
        QueueClient queueClient;
        public async Task Run()
        {
            Console.WriteLine("Press any key to exit the scenario");

            await Task.WhenAll(
                this.SendMessagesAsync(Guid.NewGuid().ToString(), connectionString, SessionQueueName),
                this.SendMessagesAsync(Guid.NewGuid().ToString(), connectionString, SessionQueueName),
                this.SendMessagesAsync(Guid.NewGuid().ToString(), connectionString, SessionQueueName),
                this.SendMessagesAsync(Guid.NewGuid().ToString(), connectionString, SessionQueueName));

            queueClient = new QueueClient(connectionString, SessionQueueName);
            RegisterOnMessageHandlerAndReceiveMessages();

            await queueClient.CloseAsync();
        }

        async Task SendMessagesAsync(string sessionId, string connectionString, string queueName)
        {
            var sender = new MessageSender(connectionString, queueName);
          var client = new QueueClient(connectionString, SessionQueueName);
            dynamic data = new[]
            {
                new {step = 1, title = "Shop"},
                new {step = 2, title = "Unpack"},
                new {step = 3, title = "Prepare"},
                new {step = 4, title = "Cook"},
                new {step = 5, title = "Eat"},
            };

            for (int i = 0; i < data.Length; i++)
            {
                var message = new Message(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data[i])))
                {
                    SessionId = sessionId,
                    ContentType = "application/json",
                    Label = "RecipeStep",
                    MessageId = i.ToString(),
                    TimeToLive = TimeSpan.FromMinutes(2)
                };
                await client.SendAsync(message);
                lock (Console.Out)
                {
                    Console.ForegroundColor = ConsoleColor.Yellow;
                    Console.WriteLine("Message sent: Session {0}, MessageId = {1}", message.SessionId, message.MessageId);
                    Console.ResetColor();
                }
            }
        }

         void RegisterOnMessageHandlerAndReceiveMessages()
        {
            
            queueClient.RegisterSessionHandler( processMessage,
               
                new SessionHandlerOptions(LogMessageHandlerException)
                {
                    
                    MaxConcurrentSessions = 16,
                    AutoComplete = false,
                    MessageWaitTimeout=TimeSpan.FromSeconds(30),
                    MaxAutoRenewDuration=TimeSpan.FromMinutes(55),
                });
        
        }

        private async Task<Task> processMessage(IMessageSession session, Message message, CancellationToken cancellationToken) {
            Console.WriteLine("His");
           
                var body = message.Body;

                dynamic recipeStep = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(body));
                lock (Console.Out)
                {
                    Console.ForegroundColor = ConsoleColor.Cyan;
                    Console.WriteLine(
                        "\t\t\t\tMessage received:  \n\t\t\t\t\t\tSessionId = {0}, \n\t\t\t\t\t\tMessageId = {1}, \n\t\t\t\t\t\tSequenceNumber = {2}," +
                        "\n\t\t\t\t\t\tContent: [ step = {3}, title = {4} ]",
                        message.SessionId,
                        message.MessageId,
                        message.SystemProperties.SequenceNumber,
                        recipeStep.step,
                        recipeStep.title);
                    Console.ResetColor();
                }
                await session.CompleteAsync(message.SystemProperties.LockToken);

                if (recipeStep.step == 5)
                {
                    // end of the session!
                    await session.CloseAsync();
                }

            return Task.CompletedTask;

        }

        private Task LogMessageHandlerException(ExceptionReceivedEventArgs e)
        {
            Console.WriteLine("Exception: \"{0}\" {1}", e.Exception.Message, e.ExceptionReceivedContext.EntityPath);
            return Task.CompletedTask;
        }

        public static int Main(string[] args)
        {
            try
            {
                var app = new Program();
                app.Run().GetAwaiter().GetResult();
            }
            catch (Exception e)
            {
                Console.WriteLine(e.ToString());
                return 1;
            }
            return 0;
        }
    }
}
 类似资料:
  • 在.NET core 2.0中使用创建时,我遇到了一个问题。 在体系结构中,当在用于创建用户的队列中创建新消息时,服务必须接收该消息并根据其中的信息在数据库中创建用户。 在Visual Studio2017中,我在下创建了一个新项目。 这种的正确实现是什么?在GitHub上有什么例子吗?提前道谢。

  • 我目前正在评估使用一个服务总线和azure函数来触发一些需要通过下游api调用来完成的工作。这都是相当标准的,只是我没有很好地处理当下游系统过载和/或返回header到trottle时会发生什么(即每分钟最大调用数/等)。对于队列触发器的强制节流,我们似乎没有任何动态控制。 我知道我们可以手动设置最大并发,但这并不一定解决问题,因为我们无法控制下游系统,需要考虑它随时可能脱机或变慢。 假设消费计划

  • 我需要一起处理相同的消息集,为此,我尝试了Azure服务总线会话启用功能。为了测试这一点,我创建了一个非常简单的应用程序,一个消息在队列中成功提交,然而,当试图在“ReceiveSessionMessage”函数中接收消息时,消息会话不会返回,程序会在这一行之后退出。 我无法找出确切的根本原因,任何帮助都将不胜感激。谢谢 等待会话客户端。AcceptMessageSessionAsync();]

  • 我已经创建了一个Azure webwork,它将向服务总线队列发送强类型消息,并成功发送。 我想创建另一个webjob,只要servicebus队列中有消息,就会触发该webjob。请在下面找到我正在尝试的代码。出于某种原因,尽管servicebus队列中有消息,但当我在本地运行webjob时,webjob未被触发并出现错误。 错误: 代码: 有谁能帮我解决这个问题吗? 谢谢

  • 我一直在尝试为Azure函数实现DI,其中函数由ServiceBus触发(本例中为主题/订阅): 我在以下网站上阅读了有关Azure Functions和DI的信息: https://mcguirev10.com/2018/04/03/service-locator-azure-functions-v2.html https://blog.wille-zone.de/post/azure-func