try {
this.getLogger().info("[INFO - ListenMW] - Message Received: " +
data.getMsgID().toString() + " Size: " +
data.getMsgData().length);
this.currentSession.adjustCounter("MW Counter", 1, true);
// Setup the flowfile to transfer
FlowFile flowfile = this.currentSession.create();
flowfile = this.currentSession.putAttribute(flowfile, "key",data.getMsgID().toString());
flowfile = this.currentSession.putAttribute(flowfile, "value", new String(data.getMsgData(),StandardCharsets.UTF_8));
this.currentSession.transfer(flowfile, SUCCESS);
}catch(Exception e) {
this.getLogger().error("[INFO - ListenMW] - "+e.getMessage());
this.currentSession.adjustCounter("MW Failure", 1, true);
}
我无法确定要为msgID和msgData使用什么属性,所以我现在创建了自己的属性。我看到一篇文章,有人建议构建您自己的json结构,并将其作为有效负载发送,但同样,您将通过哪个属性发送该结构,以便将其正确映射到kafka消息?我对Kafka很陌生,到目前为止,我只尝试了一些基本的测试案例,所以请原谅我的无知,因为我有任何错误的假设。
谢谢您的指导!我使用的是Kafka2.0.1和PublishKafka2.0处理器。
根据您所分享的内容,看起来您没有将任何内容发布到Kafka中的主要原因是您实际上没有向flowfile内容写入任何内容。作为参考,这里有一个javadocs For NiFi的副本(这里还有处理器文档)。您应该这样做:
flowFile = session.write(flowFile, outStream -> {
outStream.write("some string here".getBytes());
});
我使用publishkafkarecord
,但是publishkafka
处理器在概念上非常相似。您可以按照在那里的方式设置消息的键,但需要通过将其写入flowfile正文来设置值。
在不了解更广泛的用例的情况下,似乎可以使用executescript
来完成需要做的事情。将此视为具有多个脚本语言引用的ExecuteScript的起点。
我正在构建一个自定义处理器来处理流文件,为了处理流文件,我需要从我的本地文件系统读取CSV文件。我创建了一个proerty描述符CSV_PATH,如下所示 现在我想在配置处理器时获取在UI中设置的CSV_PATH属性的值。我无法获得CSV_PATH值。另外,如果我在代码中硬编码filepath,那么我仍然无法从本地文件系统读取CSV。
我试图加载一个自定义的NiFi处理器,但无法让NiFi加载所有的.nar依赖项,尽管尝试了各种pom.xml配置。我在SO上遇到过一些类似的问题,但还没有找到这个问题的答案。
我使用的是Nifi 0.4.1版本。我写自定义代码转换CSV到avro格式。我已经创建了类文件,并能够生成nar文件。将nar文件放置在lib目录中,并重新启动nifi服务器。 任何帮助都很感激.. 谢谢,
我似乎无法创建一个自定义的DistributedMapCacheClientService(名为TestDistributedMapCacheClientService)供普通nifi处理器(如Wait和PutDistributedMapCache)使用。我可以让Wait和PutDistributedMapCache看到我的自定义服务,但处理器无法启动,因为nifi说我的nar“与AtomicDi
我正在研究创建一个自定义处理器从一个自定义源中摄取数据,那里没有现有的nifi处理器。 我一直试图理解Nifi组件如何工作的机制,并看到了一些关于如何创建自定义处理器的好文档,然而,我看不到任何关于管理偏移量的内容。假设我有一个运行1秒的处理器,但需要从某个任意偏移量继续进行处理,这可能会每秒钟产生结果,也可能不会产生结果。