当前位置: 首页 > 知识库问答 >
问题:

apache nifi自定义处理器中的偏移管理

班凌
2023-03-14

我正在研究创建一个自定义处理器从一个自定义源中摄取数据,那里没有现有的nifi处理器。

我一直试图理解Nifi组件如何工作的机制,并看到了一些关于如何创建自定义处理器的好文档,然而,我看不到任何关于管理偏移量的内容。假设我有一个运行1秒的处理器,但需要从某个任意偏移量继续进行处理,这可能会每秒钟产生结果,也可能不会产生结果。

共有1个答案

云德辉
2023-03-14

NiFi具有组件存储与其处理相关的状态的能力,许多内置组件启用此特性是为了跟踪偏移(有关处理器示例,请参见GenerateTableFetch,有关报告任务示例,请参见SiteToSiteProvenanceReportingTask)

 类似资料:
  • 可以从输入主题的特定偏移量到结束偏移量进行Kafka流处理吗? 我有一个Kafka流应用程序消耗输入主题,但由于某种原因失败了。我修复了问题并再次启动它,但它从输入主题的最新偏移量开始消耗。我知道应用程序已处理的输入主题的偏移量。现在,我如何将输入主题从一个偏移量处理到另一个偏移量。我正在使用合流平台5.1.2。

  • 如果数据库表中的列名和等价的javabean对象名称不相似,那么我们可以通过使用自定义的对象来映射它们。 看下面的例子。 为了理解上述与DBUtils相关的概念,让我们编写一个将运行读取查询的示例。创建一个示例应用程序。 更新在DBUtils入门程序中创建的文件:MainApp.java。 按照下面的说明编译并运行应用程序。 以下是的内容。 以下是文件的内容。 以下是文件的内容。 完成创建源文件后

  • 我试图加载一个自定义的NiFi处理器,但无法让NiFi加载所有的.nar依赖项,尽管尝试了各种pom.xml配置。我在SO上遇到过一些类似的问题,但还没有找到这个问题的答案。

  • 要使用自定义的布局管理器,我们可以重新实现 addItem(), sizeHint(), setGeometry(), itemAt() 和 takeAt()这些方法。为了确保当应用程序界面的空间非常小时,布局大小不会为 0,我们需要重载 minimumSize()方 法。日常中我们也经常看到这样的情形,即应用程序窗口的长和宽的尺寸是互为依存的,那 么我们就需要重载 hasHeightForWid

  • easyopen1.4.0开始支持。 创建session 登陆成功后创建session,并返回sessionId // 自定义session @PostMapping("managedSessionLogin") public String managedSessionLogin(HttpServletRequest request) { // 假设登陆成功,创建一

  • 我使用抽象处理器创建了一个自定义注释和注释处理器。这意味着我想在编译之前进行注释处理。我将自定义注释和处理器导出为Jar,并尝试将其与简单的java测试程序一起使用。我确信在编译时它会找到我的java处理器类,但不知何故,我并没有得到我在控制台上期望的o/p。下面是我的处理器中处理方法的代码。 此外,我故意使用以便我得到一个版本警告,它确认当我执行一个测试类时,它找到了我的注释处理器。 这里是我的