当前位置: 首页 > 知识库问答 >
问题:

Storm 0.10.0是否重用拓扑设计?

谢璞
2023-03-14

因此,在某种程度上,拓扑描述了一个文件所需要的流,以计数它所拥有的唯一单词。

如果我有两个文件file1和file2,那么一个应该能够调用相同的拓扑并创建该拓扑的两个实例来运行相同的字数。

为了跟踪单词计数是否确实完成,一旦文件处理完毕,单词计数拓扑的实例应该具有完成状态。

对于文件2StormSubmitter.SubmitTopology(“WordCountTopology2”,conf,Builder.CreateTopology());

更别提使用storm客户端同样上传jar

Storm jar StormWordCount-1.0.0-jar-with-dependencies.jar com.company.wordCount1main.app“server”“filePath1”

另一个问题是,一旦文件被处理,拓扑就无法完成。在我们对拓扑发出杀戮之前,它们一直活着

暴风杀“WordCountTopology”

我知道,在流媒体世界中,消息来自像Kafka这样的消息队列,没有消息结束,但在实体/消息固定的文件世界中,这又有什么关系。

//可以在Storm中查询当前作业是否完成JobStatus status=StormSubmitter.GetTopologyStatus(conf,tracker);

共有1个答案

陆俊迈
2023-03-14

对于重复使用同一拓扑两次,您有两种可能:

1)为您的文件喷注使用构造函数参数,并使用不同的参数两次实例化相同的拓扑:

private StormTopology createMyTopology(String filename) {
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("File Spout", new FileSpout(filename));
    // add further spouts and bolts etc.
    return builder.createTopology();
}

public static void main(String[] args) {
    String file1 = "/path/to/file1";
    String file2 = "/path/to/file2";
    Config c = new Config();
    if(useFile1) {
        StormSubmitter.submitTopology("T1", c, createMyTopology(file1));
    } else {
        StormSubmitter.submitTopology("T1", c, createMyTopology(file2));
    }
}

2)作为另一种选择,您可以在open()方法中配置文件喷口。

public class FileSpout extends IRichSpout {
    @Override
    public void open(Map conf, ...) {
        String filenmae = (String)conf.get("FILENAME");
        // ...
    }
    // other methods omitted
}

    public static void main(String[] args) {
    String file1 = "/path/to/file1";
    String file2 = "/path/to/file2";
    Config c = new Config();
    if(useFile1) {
        c.put("FILENAME", file1);
    } else {
        c.put("FILENAME", file2);
    }

    // assembly topology...

    StormSubmitter.submitTopology("T", c, builder.createTopology());
}
Config cfg = new Config();
// set NIMBUS_HOST and NIMBUS_THRIFT_PORT in cfg
Client client = NimbusClient.getConfiguredClient(cfg).getClient();
TopologyInfo info = client.getTopologyInfo("topologyName");
// get emitted tuples...
client.killTopology("topologyName");
 类似资料:
  • 拓扑排序主要解决的问题是给一个图的所有节点排序。 一、什么是拓扑排序 在图论中,拓扑排序(Topological Sorting)是一个有向无环图(DAG, Directed Acyclic Graph)的所有顶点的线性序列。且该序列必须满足下面两个条件: (1)每个顶点出现且只出现一次。 (2)若存在一条从顶点 A 到顶点 B 的路径,那么在序列中顶点 A 出现在顶点 B 的前面。 有向无环图(

  • 拓扑排序的英文名是 Topological sorting。拓扑排序要解决的问题是给一个图的所有节点排序。 一、什么是拓扑排序 在图论中,拓扑排序(Topological Sorting)是一个有向无环图(DAG, Directed Acyclic Graph)的所有顶点的线性序列。且该序列必须满足下面两个条件: (1)每个顶点出现且只出现一次。 (2)若存在一条从顶点 A 到顶点 B 的路径,那

  • 我正在运行一个3节点的Storm集群。我们正在提交一个包含10个工作者的拓扑结构,以下是拓扑结构的详细信息 我们每天处理800万到1000万个数据。问题是topolgy只运行了2到3天,而我们在kafka spout中看到了一些失败的元组,没有处理任何消息。当提交新的topolgy时,它工作良好,但在2到3天后,我们又看到了同样的问题。有人能给我们一个解决方案吗。下面是我的storm配置

  • 我想知道是否指定了流拓扑处理消息的顺序。 示例: 使用: 在测试中,该流工作。但我认为这不是保证。它只起作用,因为消息在加入之前首先由计数节点处理。 但是那个订单有保证吗? 就我在所有的文档中所看到的,对这个处理订单没有任何保证。因此,如果收到新消息,也可能发生以下情况: null 谢谢你

  • 我补充说,如何让visualVm显示jvm apache storm运行的信息 工人childopts:“-Xmx1048m-Djava.net.preferIPv4Stack=true-verbose:gc-XX:PrintGCTimeStamps-XX:PrintGCDetails-Dcom.sun.management.jmxremote-Dcom.sun.management.jmxrem

  • 为了表明计算机科学家可以把任何东西变成一个图问题,让我们考虑做一批煎饼的问题。 菜谱真的很简单:1个鸡蛋,1杯煎饼粉,1汤匙油 和 3/4 杯牛奶。 要制作煎饼,你必须加热炉子,将所有的成分混合在一起,勺子搅拌。 当开始冒泡,你把它们翻过来,直到他们底部变金黄色。 在你吃煎饼之前,你会想要加热一些糖浆。 Figure 27将该过程示为图。 Figure 27 制作煎饼的困难是知道先做什么。从 Fi