我使用了这个博客中的代码(效果很好):
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("line-reader-spout", new LineReaderSpout());
builder.setBolt("word-spitter", new WordSpitterBolt()).shuffleGrouping("line-reader-spout");
builder.setBolt("word-counter", new WordCounterBolt()).shuffleGrouping("word-spitter");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("HelloStorm", config, builder.createTopology());
但是,当我调整它,使其有两个可以读取行的喷口,将行发送到两个单词吐出器
螺栓,最后将结果发送到单个
单词计数器
螺栓时,就会抛出无效拓扑的异常。
我修改过的代码
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("line-reader-spout", new LineReaderSpout());
builder.setSpout("line-reader-spout2", new LineReaderSpout());
builder.setBolt("word-spitter", new WordSpitterBolt()).shuffleGrouping("line-reader-spout");
builder.setBolt("word-spitter2", new WordSpitterBolt()).shuffleGrouping("line-reader-spout2");
builder.setBolt("word-counter", new WordCounterBolt(),2).shuffleGrouping("word-spitter").shuffleGrouping("word-splitter2");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("HelloStorm", config, builder.createTopology());
拓扑有什么问题,我没有看到任何不正当的流,在实际创建时,
word-spitter2
怎么可能不存在?
6661[main]INFOb.s.d.nimbus-[req 1]从以下位置访问:主体: op: submitTopology 6718[main]WARNb.s.d.nimbus-拓扑提交异常。(拓扑名称='HelloStorm')#InvalidTopology yExctive InvalidTopology yExctive(msg: Component:[word-计数器]订阅不存在的组件[word-splitter2])6721[main]ERROR o. a. s. o. a. z. s.NIOServerCnxnFactory-线程[main,5, main]死exec_fn__1236__auto__InvalidTopology异常在fn__6583$validate_structure_BANG_. invoke(exec_fn__1236__auto__)~[0_73: 0.10.0]在backtype.storm.daemon.common$system_topology_BANG_. invoke(common.clj:299)~[storm-core-0.10.0.jar: 0.10.0]在backtype.storm.daemon.nimbus$fn__6583$backtype.storm.generated.$reify__6598.submit拓扑选择(nimbus.clj:1091)~[storm-core-0.10.0.jar: 0.10.0]在backtype.storm.daemon.nimbus$backtype.storm.daemon.common$common.clj:160$reify__6598.submit拓扑(nimbus.clj:1119)~[storm-core-0.10.0.jar: 0.10.0]在sun.reflect.NativemetodAccessorImpl.invoke0(本机方法)~[?: 1.8.storm-core-0.10.0.jar]在sun.reflect.NativeomeodAccessorInm. invoke(NativeomeodAccessorInm. java: 62)~[?: 1.8.0_73]在sun. reava. reflector. java: 43)~[?: 1.8.0_73]在clojure. lang. reava. rang. reflector. tnkeMatchingmethod(Reflector. java: 93)~[clojure-1.6.0. jar:?]在Clojure. lang. Reflector. ainkeInstancemethod(Reflector. java: 28)~[clojure-1.6.0. jar:?]在backtype. Storm. test$submit_local_topology. invoke(test. clj: 276)~[Stor-core-0.10.0. jar: 0.10.0]在backtype. Storm. LocalClusterLocalCluster. submitTopology(未知来源)~[Storm-core-0.10.0. jar: 0.10.0]在com. spNotes. Storm. HelloStorm. main(HelloStorm. java: 36)~[类/:?]
你有拼写错误拆分器2。
建设者setBolt(“word-spitter2”,新单词SpitterBolt()。洗牌组合(“line-reader-spout2”);建设者设置螺栓(“单词计数器”,新单词计数器螺栓(),2)。洗牌组合(“吐字者”)。洗牌组合(“单词拆分器2”);
问题内容: 在Python中,一旦我在解释器会话中使用导入了模块X ,并且模块在外部进行了更改,就可以使用来重新加载该模块。然后,这些更改将在我的解释器会话中可用。 我想知道当我使用模块从模块X导入组件Y时是否也可行? 该语句不起作用,因为Y本身不是模块,而是模块内部的仅组件(在这种情况下为类)。 是否有可能在不离开解释器会话(或导入整个模块)的情况下重新加载模块的各个组件? 编辑: 为了澄清起见
问题内容: 我有一个通用组件,该组件将其子组件映射为仅过滤某种类型的子组件,如下所示。 但是,使用该属性只是一个猜测,我找不到记录。不仅如此,对其进行日志记录表明它是一个函数- 无法执行。最重要的是,使用Browserify时需要解决几个问题。 另一种选择是读取child.prototype.displayName。但这也感觉不对。 问题:基本上,我正在寻找一种比较两个ReactJS组件是否相等的
我应该如何跟踪int-http:outbound-gateway和int-jms:outbound-channel-adapter?下面是对应于java类的组件的映射。请验证。我需要在以下bean上调用setShouldTrack(true)方法,以便在消息历史记录中获取这些组件细节(名称、类型、时间戳) 目前,我能够跟踪int-http:入站网关和int-jms:消息驱动通道适配器。
我有一个在父组件中生成的事件,子组件必须对此作出反应。我知道在中不建议使用这种方法,我必须执行emit,这非常糟糕。所以我的代码是这个。 正如您所看到的,在无限滚动上被触发,事件被发送到子组件搜索。不仅仅是搜索,因为它是根,所以它正在向每个人广播。 什么是更好的方法。我知道我应该使用道具,但我不知道在这种情况下我该怎么做。
问题内容: 我在PHP文档中看到的最接近的是fread()给定的长度,但这没有指定从哪一行开始。还有其他建议吗? 问题答案: 您将无法从X行开始读取,因为行可以是任意长度。因此,您必须从头开始阅读,以计数要读取的行数才能到达X行。例如: