当前位置: 首页 > 知识库问答 >
问题:

一旦喷口完成就杀死Storm拓扑

施景同
2023-03-14

我创建了一个带有Spout的Storm拓扑,该Spout会发出许多元组用于基准测试。一旦所有的元组都从spout发出或者拓扑中不再有任何元组流动,我就想停止/终止我的拓扑。

LocalCluster cluster = new LocalCluster();
TopologyBuilder builder = new TopologyBuilder();
Config conf = new Config();
//Disabled ACK'ing for higher throughput
conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0); 

LoadGeneratorSource loadGenerator = new LoadGeneratorSource(runtime,numberOfTuplesToBeEmitted);
builder.setSpout("loadGenerator", loadGenerator);

//Some Bolts Here

while (loadGenerator.isRunning()){
//Active Waiting
}
//DO SOME STUFF WITH JAVA
cluster.killTopology("StormBenchmarkTopology");

public class LoadGeneratorSource extends BaseRichSpout {

    private final int throughput;
    private boolean running;
    private final long runtime;


    public LoadGeneratorSource(long runtime,int throughput) {
        this.throughput = throughput;
        this.runtime = runtime;
    }

    @Override
    public void nextTuple() {
        ThroughputStatistics.getInstance().pause(false);

        long endTime = System.currentTimeMillis() + runtime;
        while (running) {
            long startTs = System.currentTimeMillis();

            for (int i = 0; i < throughput; i++) {
                try {
                    emitValue(readNextTuple());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            while (System.currentTimeMillis() < startTs + 1000) {
                // active waiting
            }

            if (endTime <= System.currentTimeMillis())
                setRunning(false);
        }
    }

    public boolean isRunning() {
        return running;
    }

    public void setRunning(boolean running) {
        this.running = running;
    }

    //MORE STUFF

}

共有1个答案

宇文卓
2023-03-14

这看起来像是从喷口杀死Storm拓扑的复制品。请试一下那里给出的答案。

只是为了给出一个快速的总结;您尝试这样做的方法是行不通的,但是您可以使用来自spout的NimbusClient请求Nimbus终止您的拓扑。附带的好处是,一旦您部署到一个真正的集群,这也将起作用。

 类似资料:
  • 我们有一个不想连续运行storm拓扑的用例。相反,有一组输入(10K+)应该在指定的时间被处理,Spout连续发射这些输入,并得到拓扑中其余螺栓的处理。处理完所有输入后,在我的喷注中就没有任何东西可以从nextTuple发出。 此时,我们希望拓扑进入Hibernate状态,并在每天晚上12:00重新启动进程。 在storm配置中是否有任何属性可以设置为每天运行一次拓扑并在处理完成后Hibernat

  • 我对Apache Storm的性能有一个问题,主要是从喷口出来的。 我有一个从kestrel队列发出项目的拓扑。我获取大约2000个项目,每次在喷注中调用时,我都会发出一个。 我正在使用1个spout任务和1个spout执行器运行。我已将设置为10。 为什么每次调用之间有这么大的时间间隔?outputCollector在发出一个新元组之前是否正在等待听到每个元组的反馈? 我正在运行Java8和st

  • 我是Storm和Kafka的新手,我可以在一段时间后在本地虚拟机上安装它们。我目前有一个有效的wordCount拓扑,从dropBox文本文件中提取句子: 现在我想升级我的喷口,使用Kafka的文本,以便在拓扑结构中提交到我的下一个螺栓。我试图在git中遵循许多文章和代码,但没有任何成功。例如:这个Kafka喷口。谁能帮助我,给我一些方向,以便实现新的spout.java文件?谢谢你!

  • 在storm Framework中是否有任何预定义的、重写的或任何可用的方法来实现这一点。 提前致谢:)

  • 我使用storm0.9.4和storm-kafka:0.9.0-wip16a-scala292作为从kafka0.7读取的依赖项。 我们的Kafka保留政策是7天。 我从经纪人的最新偏移量开始读取。

  • 这里可能发生了同样的事情:错误backtype.storm.util-Async循环死亡!BufferUnderFlowException:null,但我将添加一个完整的堆栈跟踪和一些更多的上下文。 Storm版本-9.3 Storm-Kafka版本-9.3 Kafka版本-0.8.2-beta 堆栈跟踪: Spout代码(注意,出于调试目的,我使用的是一个静态定义的分区映射,只有一个代理):