当前位置: 首页 > 知识库问答 >
问题:

记录富集用NiFi流

谷梁云瀚
2023-03-14

我正在使用NIFI1.11.4构建一个数据管道,其中IoT设备以JSON格式发送数据。每次从IoT设备接收数据时,都会收到两个JSONS;

JSON_Initial

{
   devId: "abc",
   devValue: "TWOINITIALCHARS23",
}

和JSON_FINAL

{
   devId: "abc",
   devValue: "TWOINITIALCHARS45",
}
{
   devId: "abc",
   devValue: "2345",
}

共有1个答案

暨弘懿
2023-03-14

假设devid对于设备是静态的,并且不用于相关(即对于来自该设备的所有消息,abc,对于前两个消息不是abc,然后对于后两个消息则是def,等等),您有以下几个选项:

  • 使用mergeContent将flowfile内容(两个JSON块)串联起来,使用replaceText修改组合的内容以匹配所需的输出。这将需要调整MC binning属性,将合并窗口限制在1-2秒内(例如,如果每秒接收多条消息,这是困难的/不够的),并使用正则表达式删除重复内容。
  • 使用自定义脚本与设备JSON输出交互(例如,Groovy将使JSON交互非常简单)
    • 如果您在NiFi上下文中(通过executescriptinvokescriptedprocessor)执行此操作,您将可以访问NiFi框架,因此您可以计算流文件属性和内容,从而使此操作更加容易(将有初始时间戳等属性)。
    • 如果您在NiFi上下文之外(通过executeProcessexecuteStreamCommand)执行此操作,您将无法访问NiFi框架(属性等),但您可以直接与设备进行更好的交互。

 类似资料:
  • 时间线:2024.1.10在boss上投递,base深圳 投递完就有hr来打电话初筛,问了两道逻辑题,我说我都不会,然后就挂电话了,全程不到三分钟 时间线:2024.1.16晚一面 面试开始,没有自我介绍,直接开始 1.介绍一下自己负责的一个最大的测试项目 2.微信发消息的测试用例 3.cookie跟session的区别 4.什么是数据库左连接,我说是left join,他还要我把完整sql说出来

  • 我正在尝试使用Hortonworks Schema注册表反序列化一些由Nifi序列化的Kafka消息 在Nifi端用作RecordWriter的处理器:AvroRecordSetWriter 我能够在其他Nifi kafka消费者中反序列化这些消息。但是我正在尝试使用Kafka代码从我的Flink应用程序中反序列化它们。 我的Flink应用程序的Kafka反序列化程序处理程序中有以下内容: 这是用

  • 我想把Nifi flowfile发送到Spark,在Spark中做一些转换,然后再把结果发送回Nifi,这样我就可以在Nifi中进行进一步的操作。我不想写flowfile写到数据库或HDFS,然后触发火花作业。我想直接将flowfile发送到Spark,并直接从Spark接收到NIFI的结果。我尝试在Nifi中使用ExecuteSparkInteractive处理器,但我被卡住了。任何例子都是有帮

  • 我希望你们能够帮助我解决我目前面临的难题。我目前正在从事一个现有的web应用程序项目,其中一个要求是我们必须集中记录日志。该应用程序是一个分层应用程序,由客户端层(即视图)、服务层、业务层和DAO层组成。 目前,应用程序中的日志记录由控制器方法处理,其中每个控制器方法需要记录一些信息,通过调用日志记录函数手动记录数据。这些控制器方法处理的请求来自许多不同的客户端来源,包括移动设备(如电话)、网络浏

  • 本文向大家介绍Redis集群搭建全记录,包括了Redis集群搭建全记录的使用技巧和注意事项,需要的朋友参考一下 Redis集群是一个提供在多个Redis节点间共享数据的程序集。   Redis集群中不支持处理多个keys的命令。   Redis集群通过分区来提供一定程度的可用性。在某个节点宕机或者不可用的时候可以继续处理命令。 Redis集群数据分片   在Redis集群中,使用数据分片(shar