当前位置: 首页 > 知识库问答 >
问题:

从ServiceBus触发的Azure函数中检索IoT Hub Twin

顾泰平
2023-03-14

我们正在将数据从物联网设备发送到Azure物联网中心,并尝试将特定类型的消息传递给Azure功能。

目前,我们通过创建Azure Service Busendpoint并在IoTHub中创建消息路由实现了这一点。它按预期工作,Azure函数正确接收消息。

现在,我们想在Azure功能中从IoT Hub获取DeviceId,以及在Device Twin中定义的标签,我完全不知道如何做到这一点。

如果我们使用一个eventhubbtrigger,它似乎很简单,可以这样做:

public static class Test
{
    [FunctionName("TestQueueTrigger")]
    public static void Run(
        [EventHubTrigger("messages/events", Connection = "IoTHubConnection")]
        EventData message,
        Twin deviceTwin,
        TraceWriter log)
    { ... }
}

但目前还不清楚如何使用服务总线触发器实现这一点。

此外,我们希望将所有消息(独立于路由)存储到Azure Data Lake存储,我对这将如何工作有点茫然。

共有1个答案

景令秋
2023-03-14

Azure IoT Hub设备到云消息格式如下所述。此格式中没有设备孪生属性。设备双胞胎存储在云后端,可以根据到特定endpoint(内置和/或自定义endpoint)的物联网中心路由通知它们的更改。

您的函数“TestQueueTrigger”示例使用版本1的azure函数iothub扩展。分机输入绑定Twin允许使用分机内的单独调用获取设备Twin:

deviceTwin = await registryManager.GetTwinAsync(attribute.DeviceId);

基本上,此扩展还可以用于ServiceBustigger绑定。请注意,此扩展只能用于函数版本1,因此我建议使用REST API get twin调用在函数中获取设备twin。

最新消息

以下代码片段显示了ServiceBusTrigger函数和REST API GetTwin调用的示例。

跑csx文件:

#r "..\\bin\\Microsoft.Azure.ServiceBus.dll"
#r "..\\bin\\Microsoft.Azure.Devices.Shared.dll"
#r "Microsoft.Azure.WebJobs.ServiceBus"
#r "Newtonsoft.Json"


using System;
using System.Threading.Tasks;
using System.Text;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Microsoft.Azure.WebJobs.ServiceBus;
using Microsoft.Azure.ServiceBus;
using System.Globalization;
using System.Linq;
using System.Net.Http;
using System.Security.Cryptography;
using System.Web;
using Microsoft.Azure.Devices.Shared;

// reusable proxy
static HttpClientHelper iothub = new HttpClientHelper(Environment.GetEnvironmentVariable("AzureIoTHubShariedAccessPolicy"));

public static async Task Run(Message queueItem, ILogger log)
{
    // payload
    log.LogInformation($"C# ServiceBus queue trigger function processed message: {Encoding.UTF8.GetString(queueItem.Body)}");

    // device identity Id
    var deviceId = queueItem.UserProperties["iothub-connection-device-id"];

    // get the device twin
    var response = await iothub.Client.GetAsync($"/twins/{deviceId}?api-version=2018-06-30");
    response.EnsureSuccessStatusCode();
    Twin twin = await response.Content.ReadAsAsync<Twin>();

    log.LogInformation(JsonConvert.SerializeObject(twin.Tags, Formatting.Indented));

    await Task.CompletedTask;
}


// helpers
class HttpClientHelper
{
    HttpClient client;
    DateTime expiringSaS;
    (string hostname, string keyname, string key) config;

    public HttpClientHelper(string connectionString)
    {
        config = GetPartsFromConnectionString(connectionString);
        client = new HttpClient() { BaseAddress = new Uri($"https://{config.hostname}")};
        SetAuthorizationHeader();         
    }

    public HttpClient Client
    {
        get
        {          
            if (expiringSaS < DateTime.UtcNow.AddMinutes(-1))
            {
               SetAuthorizationHeader();  
            }         
            return client;
        }
    }

    internal void SetAuthorizationHeader()
    {
        lock (client)
        {
            if (expiringSaS < DateTime.UtcNow.AddMinutes(-1)) 
            {
                string sasToken = GetSASToken(config.hostname, config.key, config.keyname, 1);
                if (client.DefaultRequestHeaders.Contains("Authorization"))
                    client.DefaultRequestHeaders.Remove("Authorization");
                client.DefaultRequestHeaders.Add("Authorization", sasToken);
                expiringSaS = DateTime.UtcNow.AddHours(1);
            }
        }
    }

    internal (string hostname, string keyname, string key) GetPartsFromConnectionString(string connectionString)
    {
        var parts = connectionString.Split(new[] { ';' }, StringSplitOptions.RemoveEmptyEntries).Select(s => s.Split(new[] { '=' }, 2)).ToDictionary(x => x[0].Trim(), x => x[1].Trim());
        return (parts["HostName"] ?? "", parts["SharedAccessKeyName"] ?? "", parts["SharedAccessKey"] ?? "");
    }

    internal string GetSASToken(string resourceUri, string key, string keyName = null, uint hours = 24)
    {
        var expiry = GetExpiry(hours);
        string stringToSign = HttpUtility.UrlEncode(resourceUri) + "\n" + expiry;
        HMACSHA256 hmac = new HMACSHA256(Convert.FromBase64String(key));

        var signature = Convert.ToBase64String(hmac.ComputeHash(Encoding.UTF8.GetBytes(stringToSign)));
        var sasToken = String.Format(CultureInfo.InvariantCulture, $"SharedAccessSignature sr={HttpUtility.UrlEncode(resourceUri)}&sig={HttpUtility.UrlEncode(signature)}&se={expiry}");
        if (!string.IsNullOrEmpty(keyName))
            sasToken += $"&skn={keyName}";
        return sasToken;
    }

    internal string GetExpiry(uint hours = 24)
    {
        TimeSpan sinceEpoch = DateTime.UtcNow - new DateTime(1970, 1, 1);
        return Convert.ToString((int)sinceEpoch.TotalSeconds + 3600 * hours);
    }
}

function.json:

{
  "bindings": [
    {
      "name": "queueItem",
      "type": "serviceBusTrigger",
      "direction": "in",
      "queueName": "myQueue",
      "connection": "myConnectionString_SERVICEBUS"
    }
  ]
}
 类似资料:
  • 我已经创建了一个azure函数,当一个新文档被添加到一个集合中时,它会被触发。 是否可以从该集合中选择特定文档,然后查询所选文档中的数据? 例如,在所谓的募集服装,我有一个文件,有一个ID: 12345Tops.我想查询ID为:12345Tops的文档中找到的数据。 或者检索集合中的第一个文档,然后查询第一个选定文档 我看过带有http触发器的azure函数:https://docs.micros

  • 我有一个blob触发器Azure函数,每次将新文件添加到我的blob存储时都会调用该函数。我自动获取该文件的名称作为输入。除了名称之外,我还需要附加到给定文件的元数据。我一直在研究数据输入绑定,但我无法理解它。我需要做些什么才能将文件元数据作为输入?或者,甚至只是在我的函数中访问它?

  • Message Message,string lockToken,MessageReceiver,MessageReceiver,ILogger日志) 使用MessageReceiver或底层接口时,我会得到以下错误: Microsoft.Azure.WebJobs.Host:无法将参数“Message Receiver”绑定到类型“Microsoft.ServiceBus.Messaging.M

  • 我对azure非常陌生,正在努力处理eventGrid中的python函数触发器。我正在使用从azure为python创建的现成模板,并出现错误。我将共享这些文件。 (init.py) function.json 主机. json 这是我发送给事件网格的数据集 我得到的错误是 也许在上面的某个地方很容易出错,但我找不到。。 提前感谢!

  • 我是Azure Function应用程序中blob触发器的新手,需要一些帮助。我正在努力寻找有关如何重命名触发该函数的blob的资源。 我有一个函数应用程序,当一个新的blob被上传到容器时触发,文件被处理,我需要一种方法来将其“标记”为已处理,因此我想重命名blob。 这是我的职责: 我该如何重命名该文件?在这种情况下有可能吗?

  • 我有一个azure函数,由上传到特定容器的任何新blob(图像)触发。这些图像然后被存储在这个blob存储容器中。现在我改变了我的函数并重新部署它,我希望我的azure函数能够在所有这些(已经存储的)图像上重新运行。由于该功能是blob触发的,现在我只是手动重新上传存储容器中的相同图像,但随着图像数据的增加,这样做变得越来越不可行。 函数中的Blob触发器如下所示: 它的解决方案是什么? 谢谢你。