我想做定制的nifi处理器,这里有几个我感兴趣的主题:
如果我希望这个文件被多个处理器使用,我应该使用filec锁还是在获取flowfile时让源文件保持false,哪一个是最佳实践?现在我想要这样的OnTrigger代码:
final List<File> files = new ArrayList<>(batchSize);
queueLock.lock();
try {
fileQueue.drainTo(files, batchSize);
if (files.isEmpty()) {
return;
} else {
inProcess.addAll(files);
}
} finally {
queueLock.unlock();
}
//make xml parsing
DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
try {
dBuilder = dbFactory.newDocumentBuilder();
} catch (ParserConfigurationException e) {
e.printStackTrace();
}
Document doc = null;
try {
File f= files.get(0);
doc = dBuilder.parse(f);
} catch (SAXException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
NodeList nList = doc.getElementsByTagName("localAttributes");
for (int temp = 0; temp < nList.getLength(); temp++) {
Node nNode = nList.item(temp);
if (nNode.getNodeType() == Node.ELEMENT_NODE) {
Element eElement = (Element) nNode;
start = eElement.getElementsByTagName("start").item(0).getTextContent();
startDate = eElement.getElementsByTagName("startDate").item(0).getTextContent();
endDate = eElement.getElementsByTagName("endDate").item(0).getTextContent();
patch = eElement.getElementsByTagName("patch").item(0).getTextContent();
runAs = eElement.getElementsByTagName("runAs").item(0).getTextContent();
}
}
final ListIterator<File> itr = files.listIterator();
FlowFile flowFile = null;
try {
final Path directoryPath = directory.toPath();
while (itr.hasNext()) {
final File file = itr.next();
final Path filePath = file.toPath();
final Path relativePath = directoryPath.relativize(filePath.getParent());
String relativePathString = relativePath.toString() + "/";
if (relativePathString.isEmpty()) {
relativePathString = "./";
}
final Path absPath = filePath.toAbsolutePath();
final String absPathString = absPath.getParent().toString() + "/";
flowFile = session.create();
final long importStart = System.nanoTime();
flowFile = session.importFrom(filePath, keepingSourceFile, flowFile);
final long importNanos = System.nanoTime() - importStart;
final long importMillis = TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName());
flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString);
flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString)
Map<String, String> attributes = getAttributesFromFile(filePath);
if (attributes.size() > 0) {
flowFile = session.putAllAttributes(flowFile, attributes);
}
FlowFile flowFile1= session.create();
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName());
flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString);
flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
flowFile = session.putAttribute(flowFile, "start", start);
flowFile = session.putAttribute(flowFile, "startDate", startDate);
flowFile = session.putAttribute(flowFile, "endDate", endDate);
flowFile = session.putAttribute(flowFile, "runAs", runAs);
flowFile = session.putAttribute(flowFile, "patch", patch);
session.getProvenanceReporter().receive(flowFile, file.toURI().toString(), importMillis);
session.transfer(flowFile1, REL_SUCCESS);
FlowFile flowFile3=session.create();
flowFile3=session.importFrom(filePath, keepingSourceFile, flowFile);
NodeList run = doc.getElementsByTagName("runAs");
run.item(0).setNodeValue("false");
session.transfer(flowFile3,REL_ROLLBACK);
session.remove(flowFile);
最近几天,我在这里看到了非常类似的问题,并回答了“NIFI:编写新处理器”和“NIFI:如何编写自定义处理器”。
我完全支持学习如何在Apache NiFi中进行自定义处理器开发,但这个用例对我来说没有意义。从文件系统(HDFS或其他)检索文件是一个原子工作单元,不应该与XML解析结合在一起。将GetFile
处理器(或ListFile
/FetchFile
对)与EvaluateXpath
处理器组合,以执行此逻辑。源文件将保留在原始文件系统位置,您将获得更多的控制和对流的可见性,更不用说更健壮的性能和可维护性了。如果需要多个流使用它,可以将此段导出为模板,或者从其他处理器向其提供输入,以确定要获取哪些文件并将其输出到routeonattribute
处理器,从而通过filename
或其他类似属性将结果指向不同的使用者。
如果您对定制处理器开发感兴趣,开发人员指南和贡献者指南都提供了非常好的参考信息,Bryan Bende的博客提供了一个很好的演练。
我试图加载一个自定义的NiFi处理器,但无法让NiFi加载所有的.nar依赖项,尽管尝试了各种pom.xml配置。我在SO上遇到过一些类似的问题,但还没有找到这个问题的答案。
我使用的是Nifi 0.4.1版本。我写自定义代码转换CSV到avro格式。我已经创建了类文件,并能够生成nar文件。将nar文件放置在lib目录中,并重新启动nifi服务器。 任何帮助都很感激.. 谢谢,
我正在构建一个自定义处理器来处理流文件,为了处理流文件,我需要从我的本地文件系统读取CSV文件。我创建了一个proerty描述符CSV_PATH,如下所示 现在我想在配置处理器时获取在UI中设置的CSV_PATH属性的值。我无法获得CSV_PATH值。另外,如果我在代码中硬编码filepath,那么我仍然无法从本地文件系统读取CSV。
我正在研究创建一个自定义处理器从一个自定义源中摄取数据,那里没有现有的nifi处理器。 我一直试图理解Nifi组件如何工作的机制,并看到了一些关于如何创建自定义处理器的好文档,然而,我看不到任何关于管理偏移量的内容。假设我有一个运行1秒的处理器,但需要从某个任意偏移量继续进行处理,这可能会每秒钟产生结果,也可能不会产生结果。
我编写了一个自定义的NiFi处理器,用于在BigQuery上执行一些任务。我希望能够重用与NiFi捆绑在一起的GCP凭据提供者控制器服务,以用于身份验证。这可能吗?我尝试将nifi-gcp-processors maven依赖项添加到我的项目中,它成功构建,但当我尝试用我的nar启动NiFi时,它失败了,出现了这个错误消息。