当前位置: 首页 > 知识库问答 >
问题:

多微批量Storm拓扑

冷吉星
2023-03-14

首先诚挚的道歉,如果我的问题是重复的,我尝试搜索,但没有找到我的问题的相关答案

首先真诚的道歉,如果我问一些很基本的东西,因为我是Storm的初学者。如果我的问题是重复的,因为我试着搜索但是找不到相关的答案

请就我下面的用例提出建议。

>

  • 因此以25秒为频率的所有元组将汇集在一起,并由Bolt on每25秒发射一次(如果在25秒内收到重复的元组,则只考虑最新的一个元组)。

    类似地,所有以10分钟为频率的元组将被合并在一起,并由Bolt每隔10分钟发出一次(如果在10分钟内收到重复的元组,则只考虑最新的一个)。

    **现在我们可以有4-5种频率(例如10秒、25秒、10分钟、20分钟等,这些都是配置的),并且每个元组都应该被插入到适当的批处理中并发射(如上面的示例)。

    通知一下。对于螺栓分组,我使用了“fieldsgrouping”作为下面的配置

    *.fieldsGrouping("FILTERING_BOLT",new Fields(PUBLISHING_FREQUENCY));*
    

    请提供帮助或建议,对于我的用例来说,什么是最好的方法,因为我无法实现任何东西来处理并发元组的流动和管理Storm的内部并行性。

  • 共有1个答案

    关胜
    2023-03-14
    spout -> splitter -> 5 second window bolt
                      -> 10 second window bolt
    
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare("5-sec-stream", ...);
        declarer.declare("10-sec-stream", ...);
    }
    
    public void execute(Tuple input) {
        if (frequencyIsFive(input)) {
            collector.emit("5-sec-stream", new Values(input.getValues()))
        }
        //more cases here
    }
    
    topologyBuilder.setBolt("splitter", new SplitterBolt())
         .shuffleGrouping("spout")
    
    topologyBuilder.setBolt("5-second-window", new YourWindowingBolt())
         .globalGrouping("splitter", "5-sec-stream")
    

    有关这方面的更多信息,请参见https://storm.apache.org/releases/2.0.0-snapshot/concepts.html,特别是关于流和分组的部分。

    在https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/slidingwindowtopology.java上有一个窗口化拓扑的简单示例。

    您可能需要注意的一件事是Storm的元组超时。如果您需要一个例如10分钟的窗口,您需要将元组超时从默认的30秒大幅提高,这样元组在队列中等待时就不会超时。您可以在配置拓扑时设置conf.setMessageTimeoutSecs(15*60)。您希望在窗口间隔和元组超时之间有一点回旋余地,因为您希望尽可能避免元组超时。

     类似资料:
    • 我正在研究一个storm拓扑,需要为不同的客户端位置构建多个拓扑。 谢谢你的回复。

    • 我正在尝试使用Eclipse在Linux中运行Storm启动示例。我收到以下错误和函数从未被调用。 错误: 我的拓扑类: 我正在虚拟机环境中工作,所以不知道这是否是由于安装了Zookeeper。有什么想法吗?

    • 8台机器一直在使用。每一个都有22个核心和512 GB的RAM。但是,我们的代码运行得真的很慢。传输600万个数据需要10分钟才能完成。 60个文件中的10 MB在一秒钟内传输到HDFS。我们正在努力优化我们的代码,但很明显我们做了一些非常错误的事情。 对于蜂巢表,我们有64个桶。 在HDFS喷口;.setmaxextending(50000); 在蜂巢喷口选项;.WithTxNsperBatch

    • 如何为storm拓扑提供自定义配置?例如,如果我构建了一个连接到MySQL集群的拓扑,并且我希望能够更改需要连接到哪些服务器而不需要重新编译,我将如何做到这一点?我更喜欢使用配置文件,但我担心文件本身没有部署到集群中,因此它不会运行(除非我对集群工作方式的理解有缺陷)。到目前为止,我所看到的在运行时将配置选项传递到storm拓扑的唯一方法是通过命令行参数,但当您获得大量参数时,这将是混乱的。 有一

    • 我读了很多和Storm有关的网站。但我仍然无法将拓扑结构完美地映射到Storm集群中。 请帮助我理解这一点。 在Storm集群中有这样的术语 null null null 所有这些都要用Storm集群来映射。我已经在一个项目里工作了。所以我知道拓扑结构。

    • 我是阿帕奇Storm的新手。我已经在intellij中用java创建了一个storm项目,它成功地创建了一个本地集群,并将拓扑提交给它,然后在本地运行。我想在亚马逊EC2上运行这个Storm项目。我跟踪了https://github.com/nathanmarz/storm-deploy/wiki链接。跟随链接成功发射了2个主管,1个动物园管理员和1个灵光。现在我想在服务器上运行我的拓扑。这是我在