当前位置: 首页 > 知识库问答 >
问题:

Storm>如何将Java回调集成到Spout中

奚曦哲
2023-03-14

我正试图将Storm(见这里)整合到我的项目中。我熟悉拓扑结构、喷口和螺栓的概念。但现在,我正试图弄清楚一些事情的实际实现。

所以第一个问题是如何将进入这些方法的数据连接到喷口?我试图i)传递一个backtype.storm.topology.irichspout,然后ii)传递一个backtype.storm.spout.spoutoutputcollector(见这里)给那个spout的打开函数(见这里)。但是我看不出有什么方法可以真正传递任何类型的地图或列表。

B)我的项目的其余部分都是Clojure。将会有大量的数据通过这些方法获得。每个事件的ID在1到100之间。在Clojure中,我想将来自喷口的数据拆分到不同的执行线程中。我想,这些将是螺栓。

如何设置Clojure螺栓从喷口获取事件数据,然后根据传入事件的ID中断线程?

提前感谢蒂姆

[编辑1]

我已经克服了这个问题。我最后1)实现了我自己的iRichSpout。然后2)将Spout的内部元组连接到java回调类中的传入流数据。我不确定这是否是习惯用法。但它编译和运行都没有错误。然而,3)我没有看到传入流数据(肯定在那里),通过printstuff bolt传入。


      ;; tie Java callbacks to a Spout that I created
      (.setSpout java-callback ibspout)

      (storm/defbolt printstuff ["word"] [tuple collector]
        (println (str "printstuff --> tuple["tuple"] > collector["collector"]"))
      )
      (storm/topology
       { "1" (storm/spout-spec ibspout)
       }
       { "3" (storm/bolt-spec  { "1" :shuffle }
                               printstuff
             )
       })


    public class IBSpout implements IRichSpout {

      /**
       * Storm spout stuff
       */
      private SpoutOutputCollector _collector;

      private List _tuple = new ArrayList();
      public void setTuple(List tuple) { _tuple = tuple; }
      public List getTuple() { return _tuple; }

      /**
       * Storm ISpout interface functions
       */
      public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
      }
      public void close() {}
      public void activate() {}
      public void deactivate() {}
      public void nextTuple() {
        _collector.emit(_tuple);
      }
      public void ack(Object msgId) {}
      public void fail(Object msgId) {}


      public void declareOutputFields(OutputFieldsDeclarer declarer) {}
      public java.util.Map  getComponentConfiguration() { return new HashMap(); }

    }

共有1个答案

严兴言
2023-03-14

似乎您正在将喷口传递给您的回调类,这似乎有点奇怪。当一个拓扑被执行时,storm会定期调用spoutnexttuple方法,因此您需要做的是将java回调传递给您的自定义spout实现,以便当storm调用spout时,spout调用java回调以获取下一组元组将被馈送到拓扑中。

要理解的关键概念是,当storm请求时,Spouts会拉数据,而不是向Spouts推送数据。回调不能调用spout将数据推送到它,相反,在调用spout的nexttuple方法时,spout应该提取数据(从某个java方法或任何内存缓冲区)。

 类似资料:
  • 我最近开始使用Apache Storm,刚刚完成了我的第一个拓扑结构的构建(全部用java)。 作为下一步,我想将连接到树莓派的 TI 传感器标签中的传感器值放在这些拓扑中。 我可以通过HTTP发送传感器数据,但我不确定如何实现接收这些请求的工作spout。 拓扑的想法:它应该接收带有传感器值信息的HTTP请求,将这些数据发送到拓扑中,然后使用螺栓将它们写入文件/数据库。 到目前为止,我在Stac

  • 一、整合说明 Storm 官方对 Kafka 的整合分为两个版本,官方说明文档分别如下: Storm Kafka Integration : 主要是针对 0.8.x 版本的 Kafka 提供整合支持; Storm Kafka Integration (0.10.x+) : 包含 Kafka 新版本的 consumer API,主要对 Kafka 0.10.x + 提供整合支持。 这里我服务端安装的

  • 谁能帮我在windows中如何将storm与zookeeper集成。 我试图找到一个好的安装步骤生产在窗口,但我不能。 现在写我已经安装了独立的zookeeper,我正在尝试在Storm中配置它。yaml。 我尝试的示例代码: 如果有人知道,请帮助我。

  • 一、Storm集成HDFS 1.1 项目结构 本用例源码下载地址:storm-hdfs-integration 1.2 项目主要依赖 项目主要依赖如下,有两个地方需要注意: 这里由于我服务器上安装的是 CDH 版本的 Hadoop,在导入依赖时引入的也是 CDH 版本的依赖,需要使用 <repository> 标签指定 CDH 的仓库地址; hadoop-common、hadoop-client、

  • 问题内容: 我有一个原型Storm应用程序,该应用程序读取STOMP流并将输出存储在HBase上。它可以工作,但不是很灵活,我试图与我们的其他应用程序以更一致的方式进行设置,但运气不好,无法确定当前使用Storm的方式。我们使用spring- jms类,但不是在标准spring方法中使用它们,而是在运行时创建它们并手动设置依赖项。 这个项目:https : //github.com/granthe