当前位置: 首页 > 知识库问答 >
问题:

如何在Nifi处理器中提取和操作数据

郎鸿朗
2023-03-14

我试图编写一个自定义的Nifi处理器,它将接收传入流文件的内容,对其执行一些数学运算,然后将结果写入传出流文件。是否有一种方法可以将传入流文件的内容转储到字符串或其他东西中?我已经寻找了一段时间,现在似乎没有那么简单。如果有人能为我指出一个很好的教程,处理这样的事情,它将非常感谢。

共有1个答案

江志业
2023-03-14

Apache NiFi开发人员指南非常好地记录了创建自定义处理器的过程。在您的具体案例中,我将从组件生命周期部分和Enricg/Modify Content模式开始。任何其他执行类似工作的处理器(如ReplaceText或Base64EncodeContent)都将是学习的好例子;所有的源代码都可以在GitHub上获得。

实际上,您需要在处理器类中实现#ontrigger()方法,读取flowfile内容并将其解析为预期的格式,执行操作,然后重新填充结果的flowfile内容。您的源代码如下所示:

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }

        final ComponentLog logger = getLogger();
        AtomicBoolean error = new AtomicBoolean();
        AtomicReference<String> result = new AtomicReference<>(null);

        // This uses a lambda function in place of a callback for InputStreamCallback#process()
        processSession.read(flowFile, in -> {
            long start = System.nanoTime();

            // Read the flowfile content into a String
            // TODO: May need to buffer this if the content is large
            try {
                final String contents = IOUtils.toString(in, StandardCharsets.UTF_8);
                result.set(new MyMathOperationService().performSomeOperation(contents));

                long stop = System.nanoTime();
                if (getLogger().isDebugEnabled()) {
                    final long durationNanos = stop - start;
                    DecimalFormat df = new DecimalFormat("#.###");
                    getLogger().debug("Performed operation in " + durationNanos + " nanoseconds (" + df.format(durationNanos / 1_000_000_000.0) + " seconds).");
                }
            } catch (Exception e) {
                error.set(true);
                getLogger().error(e.getMessage() + " Routing to failure.", e);
            }
        });

        if (error.get()) {
            processSession.transfer(flowFile, REL_FAILURE);
        } else {
            // Again, a lambda takes the place of the OutputStreamCallback#process()
            FlowFile updatedFlowFile = session.write(flowFile, (in, out) -> {
                final String resultString = result.get();
                final byte[] resultBytes = resultString.getBytes(StandardCharsets.UTF_8);

                // TODO: This can use a while loop for performance
                out.write(resultBytes, 0, resultBytes.length);
                out.flush();
            });
            processSession.transfer(updatedFlowFile, REL_SUCCESS);
        }
    }

Daggett认为ExecuteScript处理器是一个很好的起点,这是正确的,因为它将缩短开发生命周期(无需构建NAR、部署和重新启动NiFi来使用它),并且当您有正确的行为时,您可以轻松地复制/粘贴到生成的框架中并部署一次。

 类似资料:
  • 有一个特定的处理器,正如下面提到的,我对它感兴趣,与1.2.0相比,1.5.0中有一个额外的特性,所以我想使用它。 我看到有两种方法。 以上处理器存储为nifi-standard-nar-x.x.x.nar-unpacked文件。所以只需从1.5.0复制Nar,并将其放在1.2.0上即可。在此之后,我不确定nifi是否会识别这个新的处理器版本? 上面的处理器是下面文件的一部分,因此从它中创建一个新

  • 如果这被认为是一个可接受的实践,我需要什么-如果有-错误处理?我的理解是,task.wait()将重新抛出异步操作抛出的任何异常,并且我没有提供任何取消异步操作的机制。仅仅调用task.wait()就足够了吗?

  • 我试图通过drewnoakes导入元数据提取器库,从正在处理的图像中提取EXIF数据。通用域名格式。我尝试了两种可能的方法来导入库,但它仍然不起作用。 我所做的: 1) 将JAR文件拖放到我的草图中,并使用import语句import com。drew。*;但它仍在抱怨图书馆不存在。 2) 将JAR文件放在My Documents文件夹的Processing文件夹中,并使用“import libr

  • 我正在尝试使用ExecuteSQL处理器从oracle数据库中提取数据。我有一些查询,例如假设在我的oracle数据库中有15条记录。在这里,当我运行ExecuteSQL处理器时,它将作为一个流进程连续运行,并将整个记录作为一个文件存储在HDFS中,并且重复这样做。因此,在HDFS位置中会有许多文件,这些文件将从oracle db中提取已经提取的记录,并且这些文件包含相同的数据。我如何使该处理器以

  • 我正在使用Tailfile处理器从计划每分钟运行的集群(3个节点)中获取日志。日志文件名每小时都会发生变化,我不知道应该使用哪种跟踪模式。如果我使用单个文件,它不会获取1小时后生成的新文件。如果我使用多文件,它是在文件名更改第三分钟后获取文件,这增加了文件的大小。我的文件的滚动文件名应该是什么,我应该使用哪种模式。你能让我知道吗。谢谢。 tail:retrieve-${now():format(“

  • 我编写了下面的POST命令,并使用“HandleHttpRequest”处理器在Apache NiFi中接收POST请求 我能够在“HandleHttpRequest”处理器中接收json数据,如下所示 当我检查列表队列时,我可以看到json数据 如何提取empdata并检查其是否为null?