当前位置: 首页 > 工具软件 > MiNiFi > 使用案例 >

nifi apache_如何使用Apache NiFi,MiNiFi,C2 Server,MQTT和Raspberry Pi构建IIoT系统

班承恩
2023-12-01

nifi apache

by Abdelkrim Hadjidj

通过Abdelkrim Hadjidj

如何使用Apache NiFi,MiNiFi,C2 Server,MQTT和Raspberry Pi构建IIoT系统 (How to build an IIoT system using Apache NiFi, MiNiFi, C2 Server, MQTT and Raspberry Pi)

How long do you think it takes to build an advanced Industrial IoT prototype that can:

您认为构建一个先进的工业物联网原型需要多长时间:

  • Collect data from sensors to a gateway at every factory

    从传感器收集数据到每个工厂的网关
  • Move sensors data from one or several factories to the Cloud or the Data Center

    将传感器数据从一个或多个工厂移至云端或数据中心
  • Automatically Warm-Deploy new configurations to all the edge devices

    自动将新配置热部署到所有边缘设备
  • Support large scale data volume and end-to-end security

    支持大规模数据量和端到端安全性

With the right tool, you can build such system in less than one hour! In this blog post, I’ll show you how to implement an advanced IIoT prototype using Raspberry Pi hardware and open source softwares (MQTT broker, Apache NiFi, MiNiFi and MiNiFi C2 Server). I’ll focus on the architecture, the connectivity, the data collection and the automatic re-configuration.

使用正确的工具,您可以在不到一小时的时间内构建这样的系统! 在此博客文章中,我将向您展示如何使用Raspberry Pi硬件和开源软件(MQTT代理,Apache NiFi,MiNiFi和MiNiFi C2 Server)实现高级IIoT原型。 我将专注于体系结构,连接性,数据收集和自动重新配置。

This article is meant to be the beginning of a series of articles on IoT. Edge processing and data analysis will be the subject of following articles, so stay tuned :)

本文旨在作为有关IoT的一系列文章的开始。 边缘处理和数据分析将是以下文章的主题,请继续关注:)

工业物联网架构 (Industrial IoT architecture)

There are plenty of IoT reference architectures. Often, in industrial settings, you don’t have direct access to sensors and control systems. A gateway is used to bridge the OT and the IT world. For this reason, IIoT architecture often includes edge devices, gateways, regional hubs and finally storage/processing systems.

有很多物联网参考架构。 通常,在工业环境中,您无法直接访问传感器和控制系统。 网关用于桥接OT和IT世界。 因此,IIoT体系结构通常包括边缘设备,网关,区域中心和最终的存储/处理系统。

The following picture shows the global architecture of our system and the software tools that we will use at each level.

下图显示了我们系统的全局体系结构以及将在每个级别使用的软件工具。

At the edge level, sensors collect information on the digital world and send them to a gateway through a variety of wired and wireless protocols (Serial, RS-485, MODBUS, CAN bus, OPC UA, BLE, WiFi, and so on). In our example, we will use various sensors (light, temperature, cameras, accelerometers, and so on) that send data to the gateway via WiFi.

在边缘级别 ,传感器收集数字世界中的信息,并通过各种有线和无线协议(串行,RS-485,MODBUS,CAN总线,OPC UA,BLE,WiFi等)将其发送到网关。 在我们的示例中,我们将使用各种传感器(光线,温度,摄像机,加速度计等)通过WiFi将数据发送到网关。

The gateway is a Raspberry Pi running a Mosquitto Broker and a MiNiFi agent. Mosquitto is an Open Source, lightweight messaging broker that we use to expose sensor data through the MQTT protocol. MQTT has a minimal footprint which makes it suitable for IoT applications and ressource constrained hardware, such as phones or microcontrollers.

网关是运行Mosquitto Broker和MiNiFi代理的Raspberry Pi。 Mosquitto是一个开源的轻量级消息代理,我们使用它通过MQTT协议公开传感器数据。 MQTT的占地面积极小,使其适合于IoT应用程序和资源受限的硬件,例如电话或微控制器。

Apache MiNiFi — a subproject of Apache NiFi — is a light-weight agent that implements the core features of Apache NiFi, focusing on data collection at the edge.

Apache MiNiFi是Apache NiFi的子项目,是一种轻量级代理,它实现了Apache NiFi的核心功能,着重于边缘数据的收集。

MiNiFi design goals are: small size and low resource consumption, central management of agents, and edge intelligence. MiNiFi can be easily integrated with NiFi through Site-to-Site protocol (S2S) to build an end-to-end flow management solution which is scalable, secure, and provides full chain of custody of information (provenance).

MiNiFi的设计目标是:体积小,资源消耗低,代理的集中管理和边缘智能。 通过站点到站点协议(S2S),可以轻松地将MiNiFi与NiFi集成,以构建可扩展,安全并提供完整信息保管(来源)的端到端流管理解决方案。

In our system, MiNiFi will subscribe to all the topics of the Mosquitto broker, and forward every new message to NiFi at the Regional level. We can also use it to connect to SCADA system or any other OT data provider.

在我们的系统中,MiNiFi将订阅Mosquitto经纪人的所有主题,并将每条新消息转发到区域级别的NiFi。 我们也可以使用它连接到SCADA系统或任何其他OT数据提供者。

At the regional level, we have two components:

在区域一级,我们有两个组成部分:

Apache NiFi is a powerful data flow platform with more than 200 out-of-the-box connectors. Thanks to its UI, designing data flows becomes quick and easy.

Apache NiFi是一个功能强大的数据流平台,具有200多个现成的连接器。 得益于其UI,设计数据流变得轻松快捷。

NiFi doesn’t trade power for simplicity. Indeed, it’s a highly scalable distributed system with guaranteed delivery, backpressure, and load distribution. These features make NiFi a great tool for IoT applications where the network quality can be challenging.

NiFi不会为了简单而交换力量。 确实,这是一个具有可扩展性的分布式系统,可确保交付,背压和负载分配。 这些功能使NiFi成为物联网应用的绝佳工具,在这些应用中,网络质量可能会面临挑战。

In our system, NiFi plays the central role of collecting data from every factory and routing it to several systems and applications (HDFS, HBase, Kafka, S3, and so on).

在我们的系统中,NiFi发挥着中心作用,即从每个工厂收集数据并将其路由到多个系统和应用程序(HDFS,HBase,Kafka,S3等)。

MiNiFi C2 Server (MiNiFi Commande & Control) is another subproject of Apache NiFi currently under development. Its role is to provide a central point of configuration to hundreds or thousands of MiNiFi agents in the wild. The C2 server manages versioned classes of applications (MiNiFi flow configurations) and exposes them through a Rest API. MiNiFi agents can connect to this API at a defined frequency to update their configuration.

MiNiFi C2服务器(MiNiFi Commande&Control)是当前正在开发的Apache NiFi的另一个子项目。 它的作用是为野外成百上千的MiNiFi代理提供配置的中心点。 C2服务器管理应用程序的版本化类(MiNiFi流配置),并通过Rest API公开它们。 MiNiFi代理可以以定义的频率连接到此API,以更新其配置。

Once data land at the company servers, on the Cloud or at the Data Center, there’s a large set of applications that can be implemented. Real-time monitoring, process analysis and optimization, or predictive maintenance are a few examples. Data processing and use case implementation will be discussed in a future article.

一旦数据到达公司服务器, 云或数据中心 ,便可以实现大量应用程序。 实时监控,过程分析和优化或预测性维护就是其中的几个示例。 数据处理和用例实现将在以后的文章中讨论。

系统实施 (System implementation)

Let’s start building our prototype.

让我们开始构建我们的原型。

准备Raspberry Pi:MQTT和MiNiFi (Preparing the Raspberry Pi: MQTT and MiNiFi)

To install Mosquitto MQTT broker and MiNiFi agent, run the following commands on your Raspberry Pi.

要安装Mosquitto MQTT代理和MiNiFi代理,请在Raspberry Pi上运行以下命令。

To have a small size, MiNiFi is packaged with a minimal set of default processors. It’s possible to add any NiFi processor by deploying the NAR (NiFi Archive) in the lib directory. In the last command of the below block, I add the MQTT processor’s NAR.

为了缩小体积,MiNiFi打包了最少的默认处理器集。 通过在lib目录中部署NAR(NiFi存档),可以添加任何NiFi处理器。 在下面的块的最后一个命令中,我添加了MQTT处理器的NAR。

sudo apt-get update#install and run Mosquitto broker on default port 1883sudo apt-get install mosquittomosquitto#install and prepare MiNiFi agentwget http://apache.crihan.fr/dist/nifi/minifi/0.4.0/minifi-0.4.0-bin.tar.gztar -xvf minifi-0.4.0-bin.tar.gzcd minifi-0.4.0#add mqtt processorwget https://github.com/ahadjidj-hw/NiFi/raw/master/nifi-mqtt-nar-1.5.0.nar -P ./lib/

By default, configuring a MiNiFi agent requires editing the file ./conf/config.yml to include the list of used processors and their configurations. The configuration can be written manually, or designed using the NiFi UI and exporting the flow as a template. The template is an XML file that we need to convert to a YML file with the MiNiFi toolkit. Here’s an example of a configuration file that tails a file and sends each line to a remote NiFi via S2S.

默认情况下,配置MiNiFi代理程序需要编辑./conf/config.yml文件以包括使用的处理器及其配置的列表。 可以手动编写配置,也可以使用NiFi UI设计配置,然后将流程导出为模板。 该模板是一个XML文件,我们需要使用MiNiFi工具包将其转换为YML文件。 这是一个配置文件的示例,该文件尾部一个文件,并通过S2S将每一行发送到远程NiFi。

For our project, we won’t use these manual steps. With many MiNiFi agents running on geographically distributed factories, it is not possible to manually stop, edit the config.yml, and then restart every agent every time their configuration needs to change.

对于我们的项目,我们将不使用这些手动步骤。 由于许多MiNiFi代理在地理位置分散的工厂上运行,因此无法手动停止,编辑config.yml,然后在每次需要更改其配置时重新启动每个代理。

MiNiFi uses a “Change Ingestor” by which the agent is notified of a potential new configuration. Change ingestors are pluggable modules, and currently three ingestors are supported OOTB:

MiNiFi使用“更改接收器”,通过该“更改接收器”将可能的新配置通知给代理。 变更摄取器是可插入模块,目前支持三种OOTB摄取器:

  • FileChangeIngestor

    FileChangeIngestor
  • RestChangeIngestor

    RestChangeIngestor
  • PullHttpChangeIngestor

    PullHttpChangeIngestor

We will use a PullHttpChangeIngestor to query a C2 server every period of time and download any new configuration available. To configure this ingestor, edit the file ./conf/bootstrap.conf, uncomment the corresponding lines, and set the ingestor properties as follows:

我们将每隔一段时间使用PullHttpChangeIngestor来查询C2服务器并下载任何可用的新配置。 要配置此摄取器,请编辑文件./conf/bootstrap.conf,取消注释相应的行,并按如下所示设置摄取器属性:

nifi.minifi.notifier.ingestors=org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor
# Hostname on which to pull configurations from
nifi.minifi.notifier.ingestors.pull.http.hostname=c2-server
# Port on which to pull configurations from
nifi.minifi.notifier.ingestors.pull.http.port=10080
# Path to pull configurations from
nifi.minifi.notifier.ingestors.pull.http.path=/c2/config
# Query string to pull configurations with
nifi.minifi.notifier.ingestors.pull.http.query=class=iot-minifi-raspberry-agent
# Period on which to pull configurations from, defaults to 5 minutes if commented out
nifi.minifi.notifier.ingestors.pull.http.period.ms=60000

With this configuration, each MiNiFi agent will query the C2 Server REST API at http://c2-server:10080/c2/config every 1 minute and ask for the latest configuration for the “iot-minifi-raspberry-agent” class.

使用此配置,每个MiNiFi代理将每1分钟在http:// c2-server:10080 / c2 / config上查询C2服务器REST API,并要求“ iot-minifi-raspberry-agent”类的最新配置。

Note: the 1 minute frequency is only for demo purposes. You won’t be updating your agents so frequently.

注意:1分钟的频率仅用于演示目的。 您将不会如此频繁地更新代理。

Don’t start your agent now, and let’s move to the regional level and configure MiNiFi C2 server and NiFi.

现在不要启动您的代理,让我们移至区域级别并配置MiNiFi C2服务器和NiFi。

安装和配置MiNiFi C2服务器 (Installing and configuring the MiNiFi C2 Server)

Install MiNiFi C2 server on a public server that’s reachable from the MiNiFi agents. You can use hierarchical C2 deployment for network constrained applications as described a few lines below. Run the following command to install the C2 server:

在可从MiNiFi代理访问的公共服务器上安装MiNiFi C2服务器。 您可以将C2分层部署用于受网络限制的应用程序,如以下几行所述。 运行以下命令以安装C2服务器:

wget http://apache.crihan.fr/dist/nifi/minifi/0.4.0/minifi-c2-0.4.0-bin.tar.gztar -xvf minifi-c2-0.4.0-bin.tar.gzcd minifi-c2-0.4.0

The C2 server exposes MiNiFi applications through a REST API organized by classes. C2 supports pluggable “Configuration Providers” and currently supports:

C2服务器通过按类组织的REST API公开MiNiFi应用程序。 C2支持可插拔的“配置提供程序”,当前支持:

  • The CacheConfigurationProvider, which looks at the directory on the filesystem or on S3

    CacheConfigurationProvider,其中 查看文件系统或S3上的目录

  • The DelegatingConfigurationProvider, which delegates to another C2 server to allow for hierarchical C2 structures

    DelegatingConfigurationProvider,它委派给另一个C2服务器以允许分层C2结构

  • The NiFiRestConfigurationProvider, which pulls templates from a NiFi instance over its REST API

    NiFiRestConfigurationProvider,可通过其REST API从NiFi实例中提取模板

Configure the C2 Server to use NiFi as a configuration provider. Edit the file ./conf/minifi-c2-context.xml and provide the NiFi server address http://nifi-dev:8080

配置C2服务器以使用NiFi作为配置提供程序。 编辑文件./conf/minifi-c2-context.xml并提供NiFi服务器地址http:// nifi-dev:8080

安装和配置NiFi服务器 (Installing and configuring the NiFi Server)

Install NiFi on a server reachable from the C2 server and run it.

在C2服务器可访问的服务器上安装NiFi并运行它。

wget http://apache.crihan.fr/dist/nifi/1.6.0/nifi-1.6.0-bin.tar.gztar -xvf nifi-1.6.0-bin.tar.gzcd nifi-1.6.0./bin/nifi.sh start

Let’s connect to the NiFi UI at http://nifi-dev:8080/nifi/ and create the flow that will run in the MiNiFi agents. But before this, add an input port to the root canvas and name it “from Raspberry MiNiFi”. This is where NiFi will receive flow files from MiNiFi.

让我们在http:// nifi-dev:8080 / nifi /上连接到NiFi UI,并创建将在MiNiFi代理中运行的流。 但是在此之前,请在根画布上添加一个输入端口,并将其命名为“来自Raspberry MiNiFi”。 NiFi将从此处接收来自MiNiFi的流文件。

Add a consumeMQTT processor to subscribe to the Mosquitto broker and subscribe to all topics under iot/sensors. Note that the tcp://raspberrypi:1883 here is equivalent to tcp://localhost:1883, since this flow will be running on the Raspberry Pi.

添加consumerMQTT处理器以订阅Mosquitto代理并订阅iot / sensors下的所有主题。 请注意,此处的tcp:// raspberrypi:1883与tcp:// localhost:1883等效,因为此流将在Raspberry Pi上运行。

Use an UpdateAttribute processor to add a “version” attribute that we will use to show the re-configuration feature. You can add any attribute you want: timestamp, agent name, location, and so on.

使用UpdateAttribute处理器添加“版本”属性,我们将使用该属性来显示重新配置功能。 您可以添加所需的任何属性:时间戳记,座席名称,位置等。

And finally, add a Remote Process Group (RPG) to send the consumed events to NiFi. Connect these three processors.

最后,添加一个远程进程组(RPG)以将使用的事件发送到NiFi。 连接这三个处理器。

Your flow now looks like the below screenshot. The left flow will be running in NiFi to receive data from MiNiFi. The right flow here is only for design and will effectively run on each Raspberry Pi.

现在,您的流程类似于以下屏幕截图。 左流将在NiFi中运行,以接收来自MiNiFi的数据。 正确的流程仅用于设计,并且可以在每个Raspberry Pi上有效运行。

Save the right flow as a template under the name “iot-minifi-raspberry-agent.v1”. The naming convention here is very important. We must use the same name as the class name used in the MiNiFi bootstrap configuration.

将正确的流程另存为模板,名称为“ iot-minifi-raspberry-agent.v1”。 这里的命名约定非常重要。 我们必须使用与MiNiFi引导程序配置中使用的类名相同的名称。

部署并启动应用程序 (Deploy and start the application)

Before starting the MiNiFi agents on the Raspberry Pi, let’s see if the C2 server is well configured. Open the following URL in your web browser : http://c2-server:10080/c2/config?class=iot-minifi-raspberry-agent&version=1 . The C2 Server replies with a file containing the configuration of the template we built, in YML format . That’s great.

在Raspberry Pi上启动MiNiFi代理之前,让我们看一下C2服务器是否配置正确。 在Web浏览器中打开以下URL: http:// c2-server:10080 / c2 / config?class = iot-minifi-raspberry-agent&version = 1 。 C2 Server答复一个文件,其中包含我们构建的模板的配置,格式为YML。 那很棒。

If you look to the C2 logs, you can see that the server received a query with the parameters {class=[iot-minifi-raspberry-agent], version=[1]}

如果查看C2日志,则可以看到服务器收到带有参数{class = [iot-minifi-raspberry-agent],version = [1]}的查询。

Now that communication between the different components of the architecture (MQTT, MiNiFi, NiFi and C2) is working, start the MiNiFi agent on the Raspberry Pi with the command:

现在,体系结构的不同组件(MQTT,MiNiFi,NiFi和C2)之间的通信正常,请使用以下命令在Raspberry Pi上启动MiNiFi代理:

./bin/minifi.sh start

After a few seconds, you see the following C2 server logs. The host 192.168.1.50 (this is the Raspberry Pi’s IP address) asked the C2 server to give it the latest version of the class “iot-minifi-raspberry-agent”. Compared to our previous call with the web browser, you’ll notice that the MiNiFi agent didn’t specify a version. If you open now the MiNiFi agent configuration at ./conf/config.yml you will find the same conf file that we retrieved from the C2 Rest API.

几秒钟后,您会看到以下C2服务器日志。 主机192.168.1.50(这是Raspberry Pi的IP地址)要求C2服务器为其提供最新版本的类“ iot-minifi-raspberry-agent”。 与我们先前使用Web浏览器进行的呼叫相比,您会注意到MiNiFi代理未指定版本。 如果现在在./conf/config.yml中打开MiNiFi代理配置,您将找到我们从C2 Rest API中检索到的相同conf文件。

Also, the MQTT shows that the MiNiFi agent connected to the broker and subscribed to the topics iot/sensors/#

此外,MQTT还显示MiNiFi代理已连接到代理并订阅了主题iot / sensors /#

Perfect! The IIoT system is running like a charm. Now let’s start our sensors to generate data and publish it in MQTT. MiNiFi will then start consuming data and sending it to NiFi as shown in the following screenshot where we have received 196 messages.

完善! IIoT系统运行得像灵符。 现在,让我们启动传感器以生成数据并将其发布在MQTT中。 然后,MiNiFi将开始使用数据并将其发送到NiFi,如以下屏幕截图所示,我们已收到196条消息。

Now let’s inspect one of these messages with the provenance feature of NiFi. This data comes from the light sensor “iot/sensors/LightIntensity/z” and the application version is 1.

现在,让我们使用NiFi的来源功能检查这些消息之一。 该数据来自光传感器“ iot / sensors / LightIntensity / z” ,应用程序版本为1。

自动魔术重新部署 (Automagic Warm Redeploy)

Now that our IIoT is running and data is flowing from every factory to our data center, let’s deploy a new application. For our test, we will make a minor modification to our MiNiFi agent configuration. Go to the NiFi web UI and edit the updateAttribute processor. Set the “version” attribute to 2 instead of 1 and save the flow in a new template “iot-minifi-raspberry-agent.v2”. That’s all! The new application will be automagically deployed.

现在我们的IIoT正在运行,并且数据正在从每个工厂流到我们的数据中心,让我们部署一个新的应用程序。 对于我们的测试,我们将对我们的MiNiFi代理配置进行较小的修改。 转到NiFi网络用户界面,然后编辑updateAttribute处理器。 将“版本”属性设置为2而不是1,并将流保存在新模板“ iot-minifi-raspberry-agent.v2”中。 就这样! 新的应用程序将自动部署。

You can see the C2 server logs below showing that a new version V2 was detected. The C2 server does not have this version in its cache and start a download and conversion process.

您可以在下面看到C2服务器日志,显示检测到新版本V2。 C2服务器的缓存中没有此版本,因此开始下载和转换过程。

Then, the MiNiFi agents detect the new configuration, backup the previous configuration, deploy the new one, and restart.

然后,MiNiFi代理检测到新配置,备份以前的配置,部署新的配置,然后重新启动。

Now let’s look to the data coming from the agents. As you can see in the provenance UI below, this data comes from a Gyroscope and has an application version 2.

现在,让我们看一下来自代理的数据。 如您在下面的出处界面中所见,该数据来自陀螺仪,并且具有应用程序版本2。

结论 (Conclusions)

Apache NiFi and its eco-system (MiNiFi and C2 server) are powerful tools for end-to-end IoT data management. It can be used to easily and quickly build advanced IoT applications with flexible architectures and advanced features (automatic warm deploy, data provenance, backpressure, and so on).

Apache NiFi及其生态系统(MiNiFi和C2服务器)是用于端到端IoT数据管理的强大工具。 它可用于轻松快速地构建具有灵活架构和高级功能(自动热部署,数据出处,背压等)的高级IoT应用程序。

In future articles, I’ll show you how to use this data, secure the platform, and implement advanced edge processing scenarios. In the meantime, you can read this article on how Renault, a multinational automobile manufacturer, uses Apache NiFi in IIoT projects and what are the best practices that they adopted.

在以后的文章中,我将向您展示如何使用这些数据,保护平台并实现高级边缘处理方案。 同时,您可以阅读这篇文章 ,了解跨国汽车制造商雷诺如何在IIoT项目中使用Apache NiFi,以及他们采用的最佳实践是什么。

Thanks for reading. As always, feedback and suggestions are welcome.

谢谢阅读。 与往常一样,欢迎您提供反馈和建议。

If you found this article useful then give it some claps and follow me for more Big Data and IoT articles!

如果您发现本文有用,请给我一些鼓掌,并关注我以获得更多有关大数据和物联网的文章!

翻译自: https://www.freecodecamp.org/news/building-an-iiot-system-using-apache-nifi-mqtt-and-raspberry-pi-ce1d6ed565bc/

nifi apache

 类似资料: