当前位置: 首页 > 知识库问答 >
问题:

在Nifi中创建自定义处理器并发送到PublishKafka

崔恺
2023-03-14
    try {
         this.getLogger().info("[INFO - ListenMW] - Message Received: " +
                            data.getMsgID().toString() + " Size: " +
                            data.getMsgData().length);
         this.currentSession.adjustCounter("MW Counter", 1, true);

         // Setup the flowfile to transfer
         FlowFile flowfile = this.currentSession.create();
         flowfile = this.currentSession.putAttribute(flowfile, "key",data.getMsgID().toString());
         flowfile = this.currentSession.putAttribute(flowfile, "value", new String(data.getMsgData(),StandardCharsets.UTF_8));

         this.currentSession.transfer(flowfile, SUCCESS);


     }catch(Exception e) {
          this.getLogger().error("[INFO - ListenMW] - "+e.getMessage());
          this.currentSession.adjustCounter("MW Failure", 1, true);
     }

我无法确定要为msgID和msgData使用什么属性,所以我现在创建了自己的属性。我看到一篇文章,有人建议构建您自己的json结构,并将其作为有效负载发送,但同样,您将通过哪个属性发送该结构,以便将其正确映射到kafka消息?我对Kafka很陌生,到目前为止,我只尝试了一些基本的测试案例,所以请原谅我的无知,因为我有任何错误的假设。

谢谢您的指导!我使用的是Kafka2.0.1和PublishKafka2.0处理器。

共有1个答案

孟晨朗
2023-03-14

根据您所分享的内容,看起来您没有将任何内容发布到Kafka中的主要原因是您实际上没有向flowfile内容写入任何内容。作为参考,这里有一个javadocs For NiFi的副本(这里还有处理器文档)。您应该这样做:

flowFile = session.write(flowFile, outStream -> {
  outStream.write("some string here".getBytes());
});

我使用publishkafkarecord,但是publishkafka处理器在概念上非常相似。您可以按照在那里的方式设置消息的键,但需要通过将其写入flowfile正文来设置值。

在不了解更广泛的用例的情况下,似乎可以使用executescript来完成需要做的事情。将此视为具有多个脚本语言引用的ExecuteScript的起点。

 类似资料:
  • 我正在构建一个自定义处理器来处理流文件,为了处理流文件,我需要从我的本地文件系统读取CSV文件。我创建了一个proerty描述符CSV_PATH,如下所示 现在我想在配置处理器时获取在UI中设置的CSV_PATH属性的值。我无法获得CSV_PATH值。另外,如果我在代码中硬编码filepath,那么我仍然无法从本地文件系统读取CSV。

  • 我试图加载一个自定义的NiFi处理器,但无法让NiFi加载所有的.nar依赖项,尽管尝试了各种pom.xml配置。我在SO上遇到过一些类似的问题,但还没有找到这个问题的答案。

  • 我使用的是Nifi 0.4.1版本。我写自定义代码转换CSV到avro格式。我已经创建了类文件,并能够生成nar文件。将nar文件放置在lib目录中,并重新启动nifi服务器。 任何帮助都很感激.. 谢谢,

  • 我似乎无法创建一个自定义的DistributedMapCacheClientService(名为TestDistributedMapCacheClientService)供普通nifi处理器(如Wait和PutDistributedMapCache)使用。我可以让Wait和PutDistributedMapCache看到我的自定义服务,但处理器无法启动,因为nifi说我的nar“与AtomicDi

  • 我正在研究创建一个自定义处理器从一个自定义源中摄取数据,那里没有现有的nifi处理器。 我一直试图理解Nifi组件如何工作的机制,并看到了一些关于如何创建自定义处理器的好文档,然而,我看不到任何关于管理偏移量的内容。假设我有一个运行1秒的处理器,但需要从某个任意偏移量继续进行处理,这可能会每秒钟产生结果,也可能不会产生结果。