当前位置: 首页 > 知识库问答 >
问题:

“无法找到flowfile内容”

都建树
2023-03-14

我正在使用NIFI1.6,当试图修改传入的FlowFile的克隆时,会出现以下错误:

[1]“找不到FlowFile的内容:...MissingFlowFileException...由ContentNotFoundException引起:找不到StandardClaim的contetn...由java.io.eOfException引起:null”

[2]“FlowFileHandlingException:StandardFlowFileRecord...在此会话中未知”

第一个错误发生在试图访问流文件的内容时,第二个错误发生在从会话中删除流文件时(在第一个错误的捕获中)。这一过程已知在NIFI0.7下工作。

基本流程是:

    null

下面是从实际使用的版本中简化的代码,演示了这个问题。(开发系统没有连接,所以我不得不复制代码。请原谅任何错别字--它应该是关闭的。这也是为什么没有提供完整的堆栈跟踪。)执行该工作的处理器有一个属性来确定是否应该立即读取。所以这两种情况都可以很容易地执行。要设置它,只需要一个GetFile处理器为SampleCloningProcessor的输出提供输入和终止符。还包括一个示例输入文件。代码的主要部分在onTrigger和merproate方法中。这个简化版本中的操作实际上除了将输入复制到输出之外什么也不做。

任何关于为什么会发生这种情况的见解和更正建议都将不胜感激-谢谢。

sampleCloningProcessor.java

processor sample.package.cloning

import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.Reader;
import java.util.Arrays;
import java.util.Hashset;
import java.util.List;
import java.util.Scanner;
import java.util.Set;

import org.apache.commons.compress.utils.IOUtils;

import org.apache.nifi.annotation.documentaion.CapabilityDescription;
import org.apache.nifi.annotation.documentaion.Tags;
import org.apache.nifi.componets.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessorContext;
import org.apache.nifi.processor.ProcessorSession;
import org.apache.nifi.processor.ProcessorInitioalizationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCalback;
import org.apache.nifi.processor.io.OutputStreamCalback;
import org.apache.nifi.processor.io.StreamCalback;
import org.apache.nifi.processor.util.StandardValidators;

import com.google.gson.Gson;

@Tags({"example", "clone"})
@CapabilityDescription("Demonsrates cloning of flowfile failure.")
public class SampleCloningProcessor extend AbstractProcessor {

    /* Determines if an immediate read is performed after cloning of inoming flowfile. */
    public static final PropertyDescriptor IMMEDIATE_READ = new PropertyDescriptor.Builder()
        .name("immediateRead")
        .description("Determines if processor runs successfully. If a read is done immediatly "
            + "after the clone of the incoming flowFile, then the processor should run successfully.")
        .required(true)
        .allowableValues("true", "false")
        .defaultValue("true")
        .addValidator(StandardValidators.BOLLEAN_VALIDATOR)
        .build();

    public static final Relationship SUCCESS = new Relationship.Builder().name("success").
        description("No unexpected errors.").build();

    public static final Relationship FAILURE = new Relationship.Builder().name("failure").
        description("Errors were thrown.").build();

    private Set<Relationship> relationships;
    private List<PropertyDescriptors> properties;

     @Override
    public void init(final ProcessorInitializationContext contex) {
        relationships = new HashSet<>(Arrays.asList(SUCCESS, FAILURE));
        properties = new Arrays.asList(IMMEDIATE_READ);
    }

    @Override
    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    @Override
    public List<PropertyDescriptor> getSuppprtedPropertyDescriptors() {
        return this.properties;
    }

   @Override
   public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
       FlowFile incomingFlowFile = session.get();

       if (incomingFlowFile == null) {
           return;
       }

       try {
           final InfileReader inFileReader = new InfileReader();
           session.read(incomingFlowFile, inFileReader);
           Product product = infileReader.getProduct();
           boolean transfer = false;

           getLogger().info("\tSession   :\n" + session);
           getLogger().info("\toriginal  :\n" + incomingFlowFile);

           for(int i = 0; i < 2; i++) {
               transfer = manipulate(context, session, inclmingFlowFile, product);
           }
       } catch (Exception e) {
           getLogger().error(e.getMessage(), e);
           session.rollback(true);
       }
   }

    private boolean manipuate(final ProcessContext context, final ProcessSession session
        final FlowFile incomingFlowFile, final Product product) {

        boolean transfer = false;
        FlowFile outgoingFlowFile = null;
        boolean immediateRead = context.getProperty(IMMEDIATE_READ).asBoolean();
        try {
            //Clone incoming flowFile
            outgoinFlowFile = session.clone(incomingFlowFile);
            getLogger().info("\tclone outgoing :\n" + outgoingFlowFile);
            if(immediateRead) {
                readFlowFile(session, outgoingFlowFile);
            }

            //First write into clone
            StageOneWrite stage1Write = new StaeOneWrite(product);
            outgoingFlowFile = session.write(outgoingFlowFile, stage1Write);
            getLogger().info("\twrite outgoing :\n" + outgoingFlowFile);

            // Format the cloned file with another write
            outgoingFlowFile = formatFlowFile(outgoingFlowFile, session)
            getLogger().info("\format outgoing :\n" + outgoingFlowFile);
            session.transfer(outgoingFlowFile, SUCCESS);
            transfer != true;
        } catch(Exception e)
           getLogger().error(e.getMessage(), e);
           if(outgoingFlowFile ! = null) {
               session.remove(outgoingFlowFile);
           }
       }
       return transfer;
   }

    private void readFlowFile(fainl ProcessSession session, fianl Flowfile flowFile) {
        session.read(flowFile, new InputStreamCallback() {
            @Override
            public void process(Final InputStream in) throws IOException {
                try (Scanner scanner = new Scanner(in)) {
                    scanner.useDelimiter("\\A").next();
                }
            }
        });
    }

    private FlowFile formatFlowFile(fainl ProcessSession session, FlowFile flowfile) {
        OutputFormatWrite formatWrite = new OutputFormatWriter();
        flowfile = session.write(flowFile, formatWriter);
        return flowFile;
    }

    private static class OutputFormatWriter implement StreamCallback {
        @Override
        public void process(final InputStream in, final OutputStream out) throws IOException {
            try {
                IOUtils.copy(in. out);
                out.flush();
            } finally {
                IOUtils.closeQuietly(in);
                IOUtils.closeQuietly(out);
            }
        }
    }

    private static class StageOneWriter implements OutputStreamCallback {

        private Product product = null;

        public StageOneWriter(Produt product) {
            this.product = product;
        }

        @Override
        public void process(final OutputStream out) throws IOException {
            final Gson gson = new Gson();
            final String json = gson.toJson(product);
            out.write(json.getBytes());
        }
    }

     private static class InfileReader implements InputStreamCallback {

        private Product product = null;

        public StageOneWriter(Produt product) {
            this.product = product;
        }

        @Override
        public void process(final InputStream out) throws IOException {
            product = null;
            final Gson gson = new Gson();
            Reader inReader = new InputStreamReader(in, "UTF-8");
            product = gson.fromJson(inreader, Product.calss);
        }

        public Product getProduct() {
            return product;
        }
    }
  package sample.processors.cloning;

  import org.apache.nifi.util.TestRunner;
  import org.apache.nifi.util.TestRunners;
  import org.junit.Before;
  import org.junit.Test;

  public class SampleCloningProcessorTest {

      final satatic String flowFileContent = "{"
          + "\"cost\": \"cost 1\","
          + "\"description\": \"description","
          + "\"markup\": 1.2"
          + "\"name\":\"name 1\","
          + "\"supplier\":\"supplier 1\","
          + "}";

      private TestRunner testRunner;

      @Before
      public void init() {
          testRunner = TestRunner.newTestRunner(SampleCloningProcessor.class);
          testRunner.enqueue(flowFileContent);
      }

      @Test
      public void testProcessorImmediateRead() {
          testRunner.setProperty(SampleCloningProcessor.IMMEDIATE_READ, "true");
          testRunner.run();
          testRinner.assertTransferCount("success", 2);
      }


      @Test
      public void testProcessorImmediateRead_false() {
          testRunner.setProperty(SampleCloningProcessor.IMMEDIATE_READ, "false");
          testRunner.run();
          testRinner.assertTransferCount("success", 2);
      }
  }
package sample.processors.cloning;

 public class Product {

  private String name;
  private String description;
  private String supplier;
  private String cost;
  private float markup;

  public String getName() {
      return name;
  }

  public void setName(final String name) {
      this.name = name;
  }

   public String getDescription() {
      return description;
  }

  public void setDescriptione(final String description) {
      this.description = description;
  }

  public String getSupplier() {
      return supplier;
  }

  public void setSupplier(final String supplier) {
      this.supplier = supplier;
  }

  public String getCost() {
      return cost;
  }

  public void setCost(final String cost) {
      this.cost = cost;
  }

  public float getMarkup() {
      return markup;
  }

  public void setMarkup(final float name) {
      this.markup = markup;
  }
}

json是一个示例输入文件。

  {
      "const" : "cost 1",
      "description" : "description 1",
      "markup" : 1.2,
      "name" : "name 1",
      "supplier" : "supplier 1"
  }

共有1个答案

凤修为
2023-03-14

报告为Nifi中的bug。正在由https://issues.apache.org/jira/browse/nifi-5879处理

 类似资料:
  • 我正在使用Spring.oxm和带有SAX解析器的jaxb2将java对象编组为xml。 我遇到了一个问题,我的封送程序抛出了一个org.xml.sax.SaxParseException声明: 发现以元素'ns59:artid'开始的无效内容。应为{[...],http://www.bipro.net/namespace/gevo“:artid,[...]}之一。 问候jcb

  • 我正在尝试使用ISOWeek,但在任何地方都找不到它。我尝试了几种不同的使用方法: 我已经尝试添加每一个系统。我可以找到运行时引用,但它总是给出这些错误: 错误CS0234:命名空间“系统”中不存在类型或命名空间名称“ISOWeek”。全球化“(是否缺少程序集引用?) 错误CS0103:名称“ISOWeek”在当前上下文中不存在 错误CS0246:找不到类型或命名空间名称'ISOYork'(您是否

  • 问题内容: 我试图从命令promopt运行一个示例Java应用程序,但出现以下错误: 我用来尝试运行此应用的命令是: 所有相关文件都位于当前工作目录中(.java,.class和.jar文件) 我用来构建.class文件的命令如下(有2个.java文件): 再次从同一工作目录运行-的内容(或多或少): 我试图以C#开发人员的身份学习Java,所以我在编程概念方面拥有深厚的背景,整个Java工具链目

  • 问题内容: Selenium无法通过ID和定位iframe Name。 这是用于Shopify上的自动结帐测试。具体问题在付款字段内。我找到的ID和名称iframe,即。 Code trials: The error is: 问题答案: 我相信可以为此使用XPath。您将需要使用XPath查找IFrame IWebElement,然后将IWebElement传递到SwitchTo()。Frame(

  • 问题内容: 我正在用Java构建一个项目。 我有这个错误: 我已经安装了JDK和文件夹:在我的系统中,但是文件不存在。 问题答案: 是的,您已经下载并安装了Java Runtime Environment(JRE)而不是Java Development Kit(JDK)。后者具有tools.jar,java.exe,javac.exe等。

  • 过了很长时间,我正在安装一台新机器。 我使用家庭酿造安装了java: 在我的。我添加的zshrc文件: 运行'java-version'会得到以下输出: openjdk 版本 “18.0.1.1” 2022-04-22 OpenJDK 运行时环境 自制 (生成 18.0.1.1 0) OpenJDK 64 位服务器虚拟机 自制 (构建 18.0.1.1 0, 混合模式, 共享) 当我尝试运行net