我正在研究NIFI数据流,我的用途是获取mysql表数据并将其放入HDFS/本地文件系统。
我已经构建了一个数据流管道,在这里我使用了querydatabaseTable处理器------ConvertRecord----PutFile处理器。
我的表模式-->id、name、city、Created_date
我的问题是,如何处理这种情况?任何其他处理器或需要更新某些属性。
需要通知QueryDatabaseTable处理器可以使用哪些列来标识新数据。
串行ID
或创建时间戳是不够的。
从文档中:
最大值列:
以逗号分隔的列名列表。处理器将跟踪自处理器开始运行以来返回的每个列的最大值。使用多列意味着列列表的顺序,每个列的值的增长速度应该比前面的列慢。因此,使用多列意味着列的分层结构,通常用于分区表。此处理器只能用于检索自上次检索以来添加/更新的行。请注意,某些JDBC类型(如bit/boolean)不利于维护最大值,因此这些类型的列不应在此属性中列出,这将在处理过程中导致错误。如果不提供列,则将考虑表中的所有行,这可能会对性能产生影响。注意:对于给定的表,使用一致的max-value列名是很重要的,这样增量提取才能正常工作。
| stamp_updated | timestamp | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
所以我基本上要说的是:
如果您自己不能判断这是sql中的新记录,nifi也不能。
我正在尝试使用ExecuteSQL处理器从oracle数据库中提取数据。我有一些查询,例如假设在我的oracle数据库中有15条记录。在这里,当我运行ExecuteSQL处理器时,它将作为一个流进程连续运行,并将整个记录作为一个文件存储在HDFS中,并且重复这样做。因此,在HDFS位置中会有许多文件,这些文件将从oracle db中提取已经提取的记录,并且这些文件包含相同的数据。我如何使该处理器以
我正在使用Tailfile处理器从计划每分钟运行的集群(3个节点)中获取日志。日志文件名每小时都会发生变化,我不知道应该使用哪种跟踪模式。如果我使用单个文件,它不会获取1小时后生成的新文件。如果我使用多文件,它是在文件名更改第三分钟后获取文件,这增加了文件的大小。我的文件的滚动文件名应该是什么,我应该使用哪种模式。你能让我知道吗。谢谢。 tail:retrieve-${now():format(“
Apache NIFI“ExecutesQL处理器”能否以“X”MB为单位流式传输大量的选择结果?
我在运行FetchElasticSearch处理器时出现了奇怪的错误,下面是错误。 FetChelasticSearch[ID=F2B2FEE3-B940-4A73-8A28-0436E765C9A2]无法读取到Elasticsearch中,原因是没有配置的节点可用:[{#Transport#-1}{127.0.0.1}{localhost/127.0.0.1:9500}],这可能表示配置错误(主
我想创建一个自定义的nifi处理器,这样我就可以读取s7 plc数据。为此,我想将这个项目的java代码:https://github.com/s7connector/s7connector转换为一个nifi处理器。 因此,我已经下载了mvn包类型,就像webiste告诉的那样:https://medium.com/hashmapinc/creating-custom-processors-and
我想做一个新的处理器,它将是GetFile和EvaluateXpath的重聚。有几个主题我感兴趣: > 现在我的nar文件超过20KB,而我的nifi无法运行它,我该如何缩小它? 我想从文件夹中获取文件,读取它的数据并将其作为一个atribute放入新的flowfile中,然后将配置xml回滚到它的原始文件夹,如何将配置文件回滚到文件夹b代码? 下面是我用来从xml配置文件中获取属性的简单代码: