当前位置: 首页 > 知识库问答 >
问题:

NIFI-QueryDatabaseTable处理器。如何查询被修改的行?

萧英光
2023-03-14

我正在研究NIFI数据流,我的用途是获取mysql表数据并将其放入HDFS/本地文件系统。

我已经构建了一个数据流管道,在这里我使用了querydatabaseTable处理器------ConvertRecord----PutFile处理器。

我的表模式-->id、name、city、Created_date

我的问题是,如何处理这种情况?任何其他处理器或需要更新某些属性。

共有1个答案

姜楷
2023-03-14

需要通知QueryDatabaseTable处理器可以使用哪些列来标识新数据。

串行ID或创建时间戳是不够的。

从文档中:

最大值列:

以逗号分隔的列名列表。处理器将跟踪自处理器开始运行以来返回的每个列的最大值。使用多列意味着列列表的顺序,每个列的值的增长速度应该比前面的列慢。因此,使用多列意味着列的分层结构,通常用于分区表。此处理器只能用于检索自上次检索以来添加/更新的行。请注意,某些JDBC类型(如bit/boolean)不利于维护最大值,因此这些类型的列不应在此属性中列出,这将在处理过程中导致错误。如果不提供列,则将考虑表中的所有行,这可能会对性能产生影响。注意:对于给定的表,使用一致的max-value列名是很重要的,这样增量提取才能正常工作。

| stamp_updated | timestamp | CURRENT_TIMESTAMP   | on update CURRENT_TIMESTAMP |

所以我基本上要说的是:

如果您自己不能判断这是sql中的新记录,nifi也不能。

 类似资料:
  • 我正在尝试使用ExecuteSQL处理器从oracle数据库中提取数据。我有一些查询,例如假设在我的oracle数据库中有15条记录。在这里,当我运行ExecuteSQL处理器时,它将作为一个流进程连续运行,并将整个记录作为一个文件存储在HDFS中,并且重复这样做。因此,在HDFS位置中会有许多文件,这些文件将从oracle db中提取已经提取的记录,并且这些文件包含相同的数据。我如何使该处理器以

  • 我正在使用Tailfile处理器从计划每分钟运行的集群(3个节点)中获取日志。日志文件名每小时都会发生变化,我不知道应该使用哪种跟踪模式。如果我使用单个文件,它不会获取1小时后生成的新文件。如果我使用多文件,它是在文件名更改第三分钟后获取文件,这增加了文件的大小。我的文件的滚动文件名应该是什么,我应该使用哪种模式。你能让我知道吗。谢谢。 tail:retrieve-${now():format(“

  • Apache NIFI“ExecutesQL处理器”能否以“X”MB为单位流式传输大量的选择结果?

  • 我在运行FetchElasticSearch处理器时出现了奇怪的错误,下面是错误。 FetChelasticSearch[ID=F2B2FEE3-B940-4A73-8A28-0436E765C9A2]无法读取到Elasticsearch中,原因是没有配置的节点可用:[{#Transport#-1}{127.0.0.1}{localhost/127.0.0.1:9500}],这可能表示配置错误(主

  • 我想创建一个自定义的nifi处理器,这样我就可以读取s7 plc数据。为此,我想将这个项目的java代码:https://github.com/s7connector/s7connector转换为一个nifi处理器。 因此,我已经下载了mvn包类型,就像webiste告诉的那样:https://medium.com/hashmapinc/creating-custom-processors-and

  • 我想做一个新的处理器,它将是GetFile和EvaluateXpath的重聚。有几个主题我感兴趣: > 现在我的nar文件超过20KB,而我的nifi无法运行它,我该如何缩小它? 我想从文件夹中获取文件,读取它的数据并将其作为一个atribute放入新的flowfile中,然后将配置xml回滚到它的原始文件夹,如何将配置文件回滚到文件夹b代码? 下面是我用来从xml配置文件中获取属性的简单代码: