当前位置: 首页 > 知识库问答 >
问题:

nifi:如何为自定义nifi处理程序制定适当的逻辑

傅元章
2023-03-14

我想做定制的nifi处理器,这里有几个我感兴趣的主题:

如果我希望这个文件被多个处理器使用,我应该使用filec锁还是在获取flowfile时让源文件保持false,哪一个是最佳实践?现在我想要这样的OnTrigger代码:

    final List<File> files = new ArrayList<>(batchSize);
    queueLock.lock();
    try {
        fileQueue.drainTo(files, batchSize);
        if (files.isEmpty()) {
            return;
        } else {
            inProcess.addAll(files);
        }
    } finally {
        queueLock.unlock();
    }

    //make  xml parsing
    DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
    try {
        dBuilder = dbFactory.newDocumentBuilder();
    } catch (ParserConfigurationException e) {
        e.printStackTrace();
    }
    Document doc = null;
    try {
        File f=  files.get(0);
        doc = dBuilder.parse(f);
    } catch (SAXException e) {
        e.printStackTrace();
    } catch (IOException e) {
        e.printStackTrace();
    }
    NodeList nList = doc.getElementsByTagName("localAttributes");
    for (int temp = 0; temp < nList.getLength(); temp++) {

        Node nNode = nList.item(temp);


        if (nNode.getNodeType() == Node.ELEMENT_NODE) {

            Element eElement = (Element) nNode;


            start = eElement.getElementsByTagName("start").item(0).getTextContent();
            startDate = eElement.getElementsByTagName("startDate").item(0).getTextContent();
            endDate = eElement.getElementsByTagName("endDate").item(0).getTextContent();
            patch = eElement.getElementsByTagName("patch").item(0).getTextContent();
            runAs = eElement.getElementsByTagName("runAs").item(0).getTextContent();

        }
    }

    final ListIterator<File> itr = files.listIterator();

    FlowFile flowFile = null;
    try {
        final Path directoryPath = directory.toPath();
        while (itr.hasNext()) {
            final File file = itr.next();
            final Path filePath = file.toPath();
            final Path relativePath = directoryPath.relativize(filePath.getParent());
            String relativePathString = relativePath.toString() + "/";
            if (relativePathString.isEmpty()) {
                relativePathString = "./";
            }
            final Path absPath = filePath.toAbsolutePath();
            final String absPathString = absPath.getParent().toString() + "/";

            flowFile = session.create();
            final long importStart = System.nanoTime();
            flowFile = session.importFrom(filePath, keepingSourceFile, flowFile);
            final long importNanos = System.nanoTime() - importStart;
            final long importMillis = TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);

            flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName());
            flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString);
            flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString)

            Map<String, String> attributes = getAttributesFromFile(filePath);
            if (attributes.size() > 0) {
                flowFile = session.putAllAttributes(flowFile, attributes);
            }

            FlowFile flowFile1= session.create();
            flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName());
            flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString);
            flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
            flowFile = session.putAttribute(flowFile, "start", start);
            flowFile = session.putAttribute(flowFile, "startDate", startDate);
            flowFile = session.putAttribute(flowFile, "endDate", endDate);
            flowFile = session.putAttribute(flowFile, "runAs", runAs);
            flowFile = session.putAttribute(flowFile, "patch", patch);

            session.getProvenanceReporter().receive(flowFile, file.toURI().toString(), importMillis);
            session.transfer(flowFile1, REL_SUCCESS);

            FlowFile flowFile3=session.create();
            flowFile3=session.importFrom(filePath, keepingSourceFile, flowFile);

            NodeList run = doc.getElementsByTagName("runAs");
            run.item(0).setNodeValue("false");
             session.transfer(flowFile3,REL_ROLLBACK);
            session.remove(flowFile);

共有1个答案

金阳曜
2023-03-14

最近几天,我在这里看到了非常类似的问题,并回答了“NIFI:编写新处理器”和“NIFI:如何编写自定义处理器”。

我完全支持学习如何在Apache NiFi中进行自定义处理器开发,但这个用例对我来说没有意义。从文件系统(HDFS或其他)检索文件是一个原子工作单元,不应该与XML解析结合在一起。将GetFile处理器(或ListFile/FetchFile对)与EvaluateXpath处理器组合,以执行此逻辑。源文件将保留在原始文件系统位置,您将获得更多的控制和对流的可见性,更不用说更健壮的性能和可维护性了。如果需要多个流使用它,可以将此段导出为模板,或者从其他处理器向其提供输入,以确定要获取哪些文件并将其输出到routeonattribute处理器,从而通过filename或其他类似属性将结果指向不同的使用者。

如果您对定制处理器开发感兴趣,开发人员指南和贡献者指南都提供了非常好的参考信息,Bryan Bende的博客提供了一个很好的演练。

 类似资料:
  • 我试图加载一个自定义的NiFi处理器,但无法让NiFi加载所有的.nar依赖项,尽管尝试了各种pom.xml配置。我在SO上遇到过一些类似的问题,但还没有找到这个问题的答案。

  • 我使用的是Nifi 0.4.1版本。我写自定义代码转换CSV到avro格式。我已经创建了类文件,并能够生成nar文件。将nar文件放置在lib目录中,并重新启动nifi服务器。 任何帮助都很感激.. 谢谢,

  • 我正在构建一个自定义处理器来处理流文件,为了处理流文件,我需要从我的本地文件系统读取CSV文件。我创建了一个proerty描述符CSV_PATH,如下所示 现在我想在配置处理器时获取在UI中设置的CSV_PATH属性的值。我无法获得CSV_PATH值。另外,如果我在代码中硬编码filepath,那么我仍然无法从本地文件系统读取CSV。

  • 我正在研究创建一个自定义处理器从一个自定义源中摄取数据,那里没有现有的nifi处理器。 我一直试图理解Nifi组件如何工作的机制,并看到了一些关于如何创建自定义处理器的好文档,然而,我看不到任何关于管理偏移量的内容。假设我有一个运行1秒的处理器,但需要从某个任意偏移量继续进行处理,这可能会每秒钟产生结果,也可能不会产生结果。

  • 我编写了一个自定义的NiFi处理器,用于在BigQuery上执行一些任务。我希望能够重用与NiFi捆绑在一起的GCP凭据提供者控制器服务,以用于身份验证。这可能吗?我尝试将nifi-gcp-processors maven依赖项添加到我的项目中,它成功构建,但当我尝试用我的nar启动NiFi时,它失败了,出现了这个错误消息。