我实现了一个简单的Storm拓扑,它具有一个spout和一个在本地集群模式下运行的bolt。
由于某种原因,spout的nextTuple()被多次调用。
知道为什么吗?
public class CommitFeedListener extends BaseRichSpout {
private SpoutOutputCollector outputCollector;
private List<String> commits;
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("commit"));
}
@Override
public void open(Map configMap,
TopologyContext context,
SpoutOutputCollector outputCollector) {
this.outputCollector = outputCollector;
}
**//that method is invoked more than once**
@Override
public void nextTuple() {
outputCollector.emit(new Values("testValue"));
}
}
public class EmailExtractor extends BaseBasicBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("email"));
}
@Override
public void execute(Tuple tuple,
BasicOutputCollector outputCollector) {
String commit = tuple.getStringByField("commit");
System.out.println(commit);
}
}
运行配置:
public class LocalTopologyRunner {
private static final int TEN_MINUTES = 600000;
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("commit-feed-listener", new CommitFeedListener());
builder
.setBolt("email-extractor", new EmailExtractor())
.shuffleGrouping("commit-feed-listener");
Config config = new Config();
config.setDebug(true);
StormTopology topology = builder.createTopology();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("github-commit-count-topology",
config,
topology);
Utils.sleep(TEN_MINUTES);
cluster.killTopology("github-commit-count");
cluster.shutdown();
}
}
谢谢你,雷。
nextTuple()由设计在无限循环中调用。这样做是为了使用外部资源(数据库、流、IO等)的脏检查。
如果您在nextTuple()中无事可做,则应该Hibernate一段时间,以防止使用backtype.storm.utils.utils滥发CPU信息
Utils.sleep(pollIntervalInMilliseconds);
Storm是一个实时处理架构,所以它确实是正确的行为。查看一些示例,看看如何根据您的需要实现一个喷口。
我试图在Haskell中编写一个函数,它可以做以下操作:输入一个整数列表,对于这些整数,使用map,有一个函数应用于它们,返回一个无限的整数列表。然后,我想使用union将foldr应用于列表,这样结果将是列表中这些列表的union。 现在的问题是,当我以10‘函数’[1,2]为例进行计算时,它会首先计算1的无限列表,因为它是一个无限列表,它永远不会对2进行计算。因此,它只返回输入列表中第一个元素
我已经开发了一个使用apache storm使用kafka消息的应用程序,当我在eclipse中运行topology using in LocalCluster时,它可以正常工作,消息也可以正常使用,但是当我使用storm命令(bin\storm jar..\kafka-storm-0.0.1-SNAPSHOT.jar com.kafka_storm.util.topology storm kaf
我有一个如下的拓扑: 其中Spout有一个由Bolt2订阅的流,而其他流是Bolt1接收元组的地方。 提前致谢
我在
我最近实现了一个4X4井字游戏的代码,这是使用极大极小算法。然而,我的极大极小函数无限次地递归调用自己。 初始板 (4X4) 井字 - 轮到电脑的代码- 在上面的代码中是船上的空位置,返回“X”(如果玩家X获胜),返回“O”(如果玩家O获胜) checkGameOver函数-
我有一个脚本,它使用php-l检查php文件中的语法错误。它在windows中工作正常,但在Linux中输出不正确: 文件exec_ip的内容。正在检查语法错误的php是(它有要检查的语法错误): 剧本是: 窗口(WAMP){更正}中的结果: LINUX(Centos/cPanel){未知输出}中的结果: 请有人帮助我,并告诉我为什么它在linux生产服务器中给出了不正确的输出。我也尝试过用she