当前位置: 首页 > 知识库问答 >
问题:

Storm处理数据极慢

佟颖逸
2023-03-14
  • 我们在单个节点上有1个喷口和1个螺栓。Spout从RabbitMQ读取数据,并将其发送到唯一一个将数据写入Cassandra的bolt。
  • 我们的数据源每秒生成10000条消息,而storm处理这条消息大约需要10秒,这对我们来说太慢了。
  • 我们尝试增加拓扑的并行度,但没有任何区别。
// Topology Class
public class SimpleTopology {

public static void main(String[] args) throws InterruptedException {
    System.out.println("hiiiiiiiiiii");
    TopologyBuilder topologyBuilder = new TopologyBuilder();
    topologyBuilder.setSpout("SimpleSpout", new SimpleSpout());
    topologyBuilder.setBolt("SimpleBolt", new SimpleBolt(), 2).setNumTasks(4).shuffleGrouping("SimpleSpout");

    Config config = new Config();
    config.setDebug(true);
    config.setNumWorkers(2);

    LocalCluster localCluster = new LocalCluster();
    localCluster.submitTopology("SimpleTopology", config, topologyBuilder.createTopology());

    Thread.sleep(2000);
}
// Simple Bolt 
public class SimpleBolt implements IRichBolt{

private OutputCollector outputCollector;

public void prepare(Map map, TopologyContext tc, OutputCollector oc) {
    this.outputCollector = oc;
}

public void execute(Tuple tuple) {
    this.outputCollector.ack(tuple);
}

public void cleanup() {
    // TODO
}

public void declareOutputFields(OutputFieldsDeclarer ofd) {
    // TODO
}

public Map<String, Object> getComponentConfiguration() {
    return null;
}
// Simple Spout

public class SimpleSpout implements IRichSpout{

private SpoutOutputCollector spoutOutputCollector;
private boolean  completed = false;
private static int i = 0;

public void open(Map map, TopologyContext tc, SpoutOutputCollector soc) {      
    this.spoutOutputCollector = soc;
}

public void close() {
    // Todo
}

public void activate() {
    // Todo
}

public void deactivate() {
    // Todo
}

public void nextTuple() {
    if(!completed)
    {
        if(i < 100000)
        {
            String item = "Tag" + Integer.toString(i++);
            System.out.println(item);
            this.spoutOutputCollector.emit(new Values(item), item);
        }
        else
        {  
            completed = true;
        }
    }
    else
    {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException ex) {
            Logger.getLogger(SimpleSpout.class.getName()).log(Level.SEVERE, null, ex);
        }
    }
}

public void ack(Object o) {
    System.out.println("\n\n OK : " + o);
}

public void fail(Object o) {
    System.out.println("\n\n Fail : " + o);
}

public void declareOutputFields(OutputFieldsDeclarer ofd) {
    ofd.declare(new Fields("word"));
}

public Map<String, Object> getComponentConfiguration() {
    return null;
}

}

更新:有没有可能使用shuffle分组,相同的元组将被处理多次?使用的配置(喷口=4.螺栓=4),现在的问题是,随着螺栓数量的增加,性能下降。

共有1个答案

壤驷承
2023-03-14

您应该找出这里的瓶颈是什么--RabbitMQ还是Cassandra。打开Storm UI并查看每个组件的延迟时间。

如果增加并行性没有帮助(通常是应该的),那么RabbitMQ或Cassandra肯定有问题,所以您应该关注它们。

 类似资料:
  • 我需要用Storm处理成批的元组。我的最后一个bolt必须等到拓扑接收到整个批处理之后才能进行一些处理。为了避免混淆--对我来说,批处理是一组N条消息,它们是实时的,这个术语不需要与批处理(Hadoop)联系在一起。即使2条消息也可以是一批。 阅读Storm的文档是否可以说Storm不支持这种批处理(实时的批处理=N条消息)? 所以我的问题是给你们,我亲爱的Storm大师们,这个拓扑是不是设计得很

  • 一、Storm 1.1 简介 Storm 是一个开源的分布式实时计算框架,可以以简单、可靠的方式进行大数据流的处理。通常用于实时分析,在线机器学习、持续计算、分布式 RPC、ETL 等场景。Storm 具有以下特点: 支持水平横向扩展; 具有高容错性,通过 ACK 机制每个消息都不丢失; 处理速度非常快,每个节点每秒能处理超过一百万个 tuples ; 易于设置和操作,并可以与任何编程语言一起使用

  • 数据处理 可将字段的值进行处理得到最终结果 html标签过滤 内容替换 批量替换 关键词过滤 条件判断 截取字符串 翻译 工具箱 将文本链接标记为图片链接:如果字段的值是完整的url链接(非<img>标签内的链接),可将链接识别为图片 使用函数 调用接口

  • 我遇到了一些数据,我想用许多不同的方式对它进行排序,例如按购买最多的最便宜的产品进行排序。我想一行一行地对文档进行分组,因为每行包含另一个“项目”。我附上了一张图片供参考。我更喜欢使用Java,但如果有必要,我会学习R。我是否手动将每行编码为数组?有400个项目,如果这是唯一的方法,我可以将其分成几天。 样品

  • Data Preparation You must pre-process your raw data before you model your problem. The specific preparation may depend on the data that you have available and the machine learning algorithms you want

  • 在输入的JSON数据中,v的值越高,粒子越亮,并且它们从出发国家到目的国家的运行越快。 (请查阅Michael Chang的文章来 了解他是如何提出这个想法的)。Gio.js库会自动缩放输入数据的范围以便于更好的数据可视化。作为开发人员,您还可以定义自己的预处理数据的方式。