当前位置: 首页 > 知识库问答 >
问题:

nextTuple()在Storm上使用BaseRichSpout调用无限次

闻人宇定
2023-03-14

我实现了一个简单的Storm拓扑,它具有一个spout和一个在本地集群模式下运行的bolt。

由于某种原因,spout的nextTuple()被多次调用。

知道为什么吗?

public class CommitFeedListener extends BaseRichSpout {
    private SpoutOutputCollector outputCollector;
    private List<String> commits;

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("commit"));
    }

    @Override
    public void open(Map configMap,
                     TopologyContext context,
                     SpoutOutputCollector outputCollector) {
        this.outputCollector = outputCollector;
    }

    **//that method is invoked more than once**
    @Override
    public void nextTuple() {

            outputCollector.emit(new Values("testValue"));

    }
}
public class EmailExtractor extends BaseBasicBolt {
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("email"));
    }
    @Override
    public void execute(Tuple tuple,
                        BasicOutputCollector outputCollector) {
        String commit = tuple.getStringByField("commit");
        System.out.println(commit);        
    }  
}

运行配置:

public class LocalTopologyRunner {
    private static final int TEN_MINUTES = 600000;
    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("commit-feed-listener", new CommitFeedListener());
                builder
        .setBolt("email-extractor", new EmailExtractor())
                .shuffleGrouping("commit-feed-listener");
        Config config = new Config();
        config.setDebug(true);
        StormTopology topology = builder.createTopology();
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("github-commit-count-topology",
                config,
                topology);
        Utils.sleep(TEN_MINUTES);
        cluster.killTopology("github-commit-count");
        cluster.shutdown();
    }
}

谢谢你,雷。

共有1个答案

文建业
2023-03-14

nextTuple()由设计在无限循环中调用。这样做是为了使用外部资源(数据库、流、IO等)的脏检查。

如果您在nextTuple()中无事可做,则应该Hibernate一段时间,以防止使用backtype.storm.utils.utils滥发CPU信息

Utils.sleep(pollIntervalInMilliseconds);

Storm是一个实时处理架构,所以它确实是正确的行为。查看一些示例,看看如何根据您的需要实现一个喷口。

 类似资料:
  • 我试图在Haskell中编写一个函数,它可以做以下操作:输入一个整数列表,对于这些整数,使用map,有一个函数应用于它们,返回一个无限的整数列表。然后,我想使用union将foldr应用于列表,这样结果将是列表中这些列表的union。 现在的问题是,当我以10‘函数’[1,2]为例进行计算时,它会首先计算1的无限列表,因为它是一个无限列表,它永远不会对2进行计算。因此,它只返回输入列表中第一个元素

  • 我已经开发了一个使用apache storm使用kafka消息的应用程序,当我在eclipse中运行topology using in LocalCluster时,它可以正常工作,消息也可以正常使用,但是当我使用storm命令(bin\storm jar..\kafka-storm-0.0.1-SNAPSHOT.jar com.kafka_storm.util.topology storm kaf

  • 我有一个如下的拓扑: 其中Spout有一个由Bolt2订阅的流,而其他流是Bolt1接收元组的地方。 提前致谢

  • 我最近实现了一个4X4井字游戏的代码,这是使用极大极小算法。然而,我的极大极小函数无限次地递归调用自己。 初始板 (4X4) 井字 - 轮到电脑的代码- 在上面的代码中是船上的空位置,返回“X”(如果玩家X获胜),返回“O”(如果玩家O获胜) checkGameOver函数-

  • 我有一个脚本,它使用php-l检查php文件中的语法错误。它在windows中工作正常,但在Linux中输出不正确: 文件exec_ip的内容。正在检查语法错误的php是(它有要检查的语法错误): 剧本是: 窗口(WAMP){更正}中的结果: LINUX(Centos/cPanel){未知输出}中的结果: 请有人帮助我,并告诉我为什么它在linux生产服务器中给出了不正确的输出。我也尝试过用she