我正试图将Storm(见这里)整合到我的项目中。我熟悉拓扑结构、喷口和螺栓的概念。但现在,我正试图弄清楚一些事情的实际实现。
所以第一个问题是如何将进入这些方法的数据连接到喷口?我试图i)传递一个backtype.storm.topology.irichspout,然后ii)传递一个backtype.storm.spout.spoutoutputcollector(见这里)给那个spout的打开函数(见这里)。但是我看不出有什么方法可以真正传递任何类型的地图或列表。
B)我的项目的其余部分都是Clojure。将会有大量的数据通过这些方法获得。每个事件的ID在1到100之间。在Clojure中,我想将来自喷口的数据拆分到不同的执行线程中。我想,这些将是螺栓。
如何设置Clojure螺栓从喷口获取事件数据,然后根据传入事件的ID中断线程?
提前感谢蒂姆
[编辑1]
我已经克服了这个问题。我最后1)实现了我自己的iRichSpout。然后2)将Spout的内部元组连接到java回调类中的传入流数据。我不确定这是否是习惯用法。但它编译和运行都没有错误。然而,3)我没有看到传入流数据(肯定在那里),通过printstuff bolt传入。
;; tie Java callbacks to a Spout that I created (.setSpout java-callback ibspout) (storm/defbolt printstuff ["word"] [tuple collector] (println (str "printstuff --> tuple["tuple"] > collector["collector"]")) ) (storm/topology { "1" (storm/spout-spec ibspout) } { "3" (storm/bolt-spec { "1" :shuffle } printstuff ) })
public class IBSpout implements IRichSpout { /** * Storm spout stuff */ private SpoutOutputCollector _collector; private List _tuple = new ArrayList(); public void setTuple(List tuple) { _tuple = tuple; } public List getTuple() { return _tuple; } /** * Storm ISpout interface functions */ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; } public void close() {} public void activate() {} public void deactivate() {} public void nextTuple() { _collector.emit(_tuple); } public void ack(Object msgId) {} public void fail(Object msgId) {} public void declareOutputFields(OutputFieldsDeclarer declarer) {} public java.util.Map getComponentConfiguration() { return new HashMap(); } }
似乎您正在将喷口传递给您的回调类,这似乎有点奇怪。当一个拓扑被执行时,storm会定期调用spoutnexttuple
方法,因此您需要做的是将java回调传递给您的自定义spout实现,以便当storm调用spout时,spout调用java回调以获取下一组元组将被馈送到拓扑中。
要理解的关键概念是,当storm请求时,Spouts会拉数据,而不是向Spouts推送数据。回调不能调用spout将数据推送到它,相反,在调用spout的nexttuple
方法时,spout应该提取数据(从某个java方法或任何内存缓冲区)。
我最近开始使用Apache Storm,刚刚完成了我的第一个拓扑结构的构建(全部用java)。 作为下一步,我想将连接到树莓派的 TI 传感器标签中的传感器值放在这些拓扑中。 我可以通过HTTP发送传感器数据,但我不确定如何实现接收这些请求的工作spout。 拓扑的想法:它应该接收带有传感器值信息的HTTP请求,将这些数据发送到拓扑中,然后使用螺栓将它们写入文件/数据库。 到目前为止,我在Stac
一、整合说明 Storm 官方对 Kafka 的整合分为两个版本,官方说明文档分别如下: Storm Kafka Integration : 主要是针对 0.8.x 版本的 Kafka 提供整合支持; Storm Kafka Integration (0.10.x+) : 包含 Kafka 新版本的 consumer API,主要对 Kafka 0.10.x + 提供整合支持。 这里我服务端安装的
谁能帮我在windows中如何将storm与zookeeper集成。 我试图找到一个好的安装步骤生产在窗口,但我不能。 现在写我已经安装了独立的zookeeper,我正在尝试在Storm中配置它。yaml。 我尝试的示例代码: 如果有人知道,请帮助我。
一、Storm集成HDFS 1.1 项目结构 本用例源码下载地址:storm-hdfs-integration 1.2 项目主要依赖 项目主要依赖如下,有两个地方需要注意: 这里由于我服务器上安装的是 CDH 版本的 Hadoop,在导入依赖时引入的也是 CDH 版本的依赖,需要使用 <repository> 标签指定 CDH 的仓库地址; hadoop-common、hadoop-client、
问题内容: 我有一个原型Storm应用程序,该应用程序读取STOMP流并将输出存储在HBase上。它可以工作,但不是很灵活,我试图与我们的其他应用程序以更一致的方式进行设置,但运气不好,无法确定当前使用Storm的方式。我们使用spring- jms类,但不是在标准spring方法中使用它们,而是在运行时创建它们并手动设置依赖项。 这个项目:https : //github.com/granthe