我有一个拓扑结构如下所示:
注意,Bolt2和Bolt3从Bolt1和Bolt4接收元组。所有bolt都是运行python脚本的ShellBolts,而spout是运行从RabbitMQ读取的python脚本的ShellSpout。除了Bolt4之外,所有的工作都是预期的。如果我在RabbitMQ中一次添加一条消息,它就会全部工作并且干净利落地完成。如果我在Bolt4上挂起消息时对消息进行排队,则Bolt4将永远不会处理该消息。其他螺栓仍然执行它们的功能,但Bolt4将只是坐在那里完成第一个。
Storm UI显示所有的元组都由bolt4执行,但只有一个被加密。没有失败。我使用的是Storm0.9.5和Storm-Starter中包含的multilang python适配器。
编辑:这里有一个最小的失败案例。
螺栓1.py:
import storm
import time
import json
class Bolt1(storm.BasicBolt):
# overrides storm.Bolt.process
def process(self, tup):
objID, APIArgs = tup.values
APIArgs = json.loads(APIArgs)
self.emit("bolt3Queue", objID, **APIArgs)
self.emit("bolt2Queue", objID, **APIArgs)
self.emit("bolt4Queue", objID, **APIArgs)
storm.ack(tup)
def emit(self, stream, objID, **APIArgs):
tup = [objID, json.dumps(APIArgs)]
storm.log("Emit [%s] %s" % (stream, tup))
storm.emit(tup, stream=stream)
if __name__ == '__main__':
Bolt1().run()
螺栓2.py:
import storm
import time
import json
class Bolt2(storm.BasicBolt):
# overrides storm.Bolt.process
def process(self, tup):
objID, APIArgs = tup.values
APIArgs = json.loads(APIArgs)
storm.ack(tup)
def emit(self, stream, objID, **APIArgs):
tup = [objID, json.dumps(APIArgs)]
storm.log("Emit [%s] %s" % (stream, tup))
storm.emit(tup, stream=stream)
if __name__ == '__main__':
Bolt2().run()
import storm
import time
import json
class Bolt3(storm.BasicBolt):
# overrides storm.Bolt.process
def process(self, tup):
objID, APIArgs = tup.values
APIArgs = json.loads(APIArgs)
storm.ack(tup)
def emit(self, stream, objID, **APIArgs):
tup = [objID, json.dumps(APIArgs)]
storm.log("Emit [%s] %s" % (stream, tup))
storm.emit(tup, stream=stream)
if __name__ == '__main__':
Bolt3().run()
import storm
import time
import json
class Bolt4(storm.BasicBolt):
# overrides storm.Bolt.process
def process(self, tup):
objID, APIArgs = tup.values
APIArgs = json.loads(APIArgs)
self.emit("bolt3Queue", objID, **APIArgs)
self.emit("bolt2Queue", objID, **APIArgs)
storm.ack(tup)
def emit(self, stream, objID, **APIArgs):
tup = [objID, json.dumps(APIArgs)]
storm.log("Emit [%s] %s" % (stream, tup))
storm.emit(tup, stream=stream)
if __name__ == '__main__':
Bolt4().run()
import storm
import random
class Spout(storm.Spout):
def nextTuple(self):
storm.emit(["id1234", "{}"], id=str(random.randint(1, 10000)))
if __name__ == '__main__':
Spout().run()
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package storm.starter;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.ShellSpout;
import backtype.storm.task.ShellBolt;
import backtype.storm.topology.*;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;
public class PyroTopology {
public static class PythonBolt extends ShellBolt implements IRichBolt {
public PythonBolt(String script) {
super("python", script);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
public static class Bolt4 extends ShellBolt implements IRichBolt {
public Bolt4() {
super("python", "bolt4.py");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("bolt3Queue", new Fields("objID", "APIArgs"));
declarer.declareStream("bolt2Queue", new Fields("objID", "APIArgs"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
public static class Bolt1 extends ShellBolt implements IRichBolt {
public Bolt1() {
super("python", "bolt1.py");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("bolt3Queue", new Fields("objID", "APIArgs"));
declarer.declareStream("bolt2Queue", new Fields("objID", "APIArgs"));
declarer.declareStream("bolt4Queue", new Fields("objID", "APIArgs"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
public static class PythonSpout extends ShellSpout implements IRichSpout {
public PythonSpout() {
super("python", "spout.py");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("objID", "APIArgs"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("rabbit", new PythonSpout(), 1);
builder.setBolt("bolt1", new Bolt1(), 1).
shuffleGrouping("rabbit");
builder.setBolt("bolt4", new Bolt4(), 1).
shuffleGrouping("bolt1", "bolt4Queue");
builder.setBolt("bolt3", new PythonBolt("bolt3.py"), 1).
shuffleGrouping("bolt1", "bolt3Queue").
shuffleGrouping("bolt4", "bolt3Queue");
builder.setBolt("bolt2", new PythonBolt("bolt2.py"), 1).
shuffleGrouping("bolt1", "bolt2Queue").
shuffleGrouping("bolt4", "bolt2Queue");
Config conf = new Config();
conf.setStatsSampleRate(1.0);
conf.put(Config.TOPOLOGY_DEBUG, true);
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 5);
conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 60);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}
else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
Thread.sleep(30000);
cluster.shutdown();
}
}
}
storm jar target/storm-starter-0.9.5-jar-with-dependencies.jar storm.starter.PyroTopology vb
您正在使用basicbolt
,它会为您自动处理加密。因此,不能在代码中手动ack输入元组。这会导致单个元组的多个ack,从而混淆了Storm跟踪ack的机制(通过对消息ID和ack ID进行异或)。作为另一种选择(如果您需要高级的加密行为,您可以实现bolt
。
如UI所示,spout没有接收到ack,因此当达到最大spout挂起时,Storm会停止发出元组。此外,您会看到bolt的“executed”和“acked”的计数不匹配--这也表明没有正确处理acks。
早上好,我试图在我们的代码中找到一个错误,关于当xml没有被格式化时使用JAXB解组XML。我已经做了很多调试和测试,但仍然找不到错误。 xml的解释部分如下所示: 列表最多可包含50.000<代码> 为此,我们有以下代码。解组器的创建被移动到一个接口,以区别我们希望通过jaxb解组的元素,包括子元素(如元素)和那些(如
我有storm topology(1个worker)设置,其中spout(java)从redis出列(使用blpop)事件并传输到bolts。但是有一个观察到的情况是,当队列超过200万,并且在storm Nimbus/Supervisor/Zookeeper/Worker日志中没有发现警告/异常时,一些事件没有被bolt接收(在clojure中,6-spout线程、50-bolt线程)。
问题内容: 我今天犯了升级Eclipse的错误,现在无法启动新的Android项目。 我收到消息Proguard.cfg(找不到文件)。 我似乎在哪里找不到这东西?是否有可能摆脱它我在这个项目中不需要混淆… 谢谢 问题答案: 如果您确实不需要Proguard来混淆发行版,则可以从项目根文件夹的default.properties文件中删除以下行: proguard.config = proguar
问题内容: 我想在我的网站上使用,但得到以下信息: 我试过打印。输出以下内容: 谁能帮助我找到或建议替代方案? 问题答案: 从文档中: 页面(如果有的话)的地址,该页面将用户代理引至当前页面。这是由用户代理设置的。并非所有的用户代理都将设置此功能,有些用户代理提供了将HTTP_REFERER修改为功能的功能。简而言之,它不能真正被信任。
问题内容: 我目前正在一个小项目上使用RPi 3B(最新的Raspbian Jessie),该项目涉及播放简短的.mp4文件。由于Pygame似乎支持播放.mpg文件,因此我将视频转换为该格式。 但是,当我尝试导入movie模块时,出现了常见的python导入错误: 经过研究,我发现其他人也遇到了与此处所述相同的问题。我真的不知道是否有解决方案,或者我在RPi上有什么替代方案。 Pygame,py
我厌倦了在我的代码中找到实际问题。我正在尝试在我的 android 项目中使用 FCM。我按照 firebase 控制台中给出的分步程序来激活我的项目中的服务,但我遇到了类似“错误”的问题:任务“:app:processDebugGoogleServices”执行失败。 文件 google-services.json 丢失。没有它,Google 服务插件将无法运行。搜索到的位置:C:\Users\