当前位置: 首页 > 知识库问答 >
问题:

Storm不追上最大的花销

杨征
2023-03-14

我已经创建了一个示例拓扑来测试设置max spout Expensing属性。这是一个简单的拓扑,有一个喷嘴和一个螺栓。喷口发出100000个元组,而螺栓在睡眠一秒钟后发出嘎嘎声。我已将“最大喷口支出”属性设置为10。我假设这意味着,如果一个喷口的未确认消息计数为10,那么该喷口将不会发出任何元组。但当我运行拓扑时,我可以看到喷口发出2160条消息,然后等待。我的理解是正确的还是遗漏了什么。我使用的是storm 0.9.5。下面是代码

public static void main(String[] args) {

    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("spout", new TestSpout(), 1);
    builder.setBolt("bolt", new TestBolt(),1).shuffleGrouping("spout");
    Config conf = new Config();
    conf.setNumWorkers(1);
    conf.setMaxSpoutPending(10);
    try {
        StormSubmitter.submitTopology("test", conf, builder.createTopology());
    } catch (AlreadyAliveException e) {
        e.printStackTrace();
    } catch (InvalidTopologyException e) {
        e.printStackTrace();
    }
}


public class TestSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private int count = 1;

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("spoutData"));
}

@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    this.collector = collector;
    System.out.println(context.maxTopologyMessageTimeout());
}

@Override
public void nextTuple() {

    if(count <= 100000) {
        System.out.println("Emitting : " + count);
        collector.emit(new Values(count++ + ""));
    }
}

}

public class TestBolt extends BaseRichBolt {
private OutputCollector collector;

@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    this.collector = collector;
}

@Override
public void execute(Tuple input) {
    try {
        System.out.println(input.getString(0));
        Thread.sleep(1000);
        collector.ack(input);
    } catch (InterruptedException e) {
        e.printStackTrace();
        System.out.println("Exception");
    }
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {

}

}

共有1个答案

司寇阳曦
2023-03-14

您需要为您在Spout.nextTuple()方法中发出的元组分配消息ID。否则,参数max.spout.pending将被忽略。例如,您可以使用您的count变量作为ID(基本上,任何东西都可以用作ID。它必须是唯一的。)

@Override
public void nextTuple() {
    if(count <= 100000) {
        System.out.println("Emitting : " + count);
        collector.emit(new Values(count++ + ""), count);
    }
}

否则,Storm无法将输出元组链接到bolt中确认的元组,也就是说,Storm无法计算有多少元组挂起。Storm只能跟踪ID为的元组。

 类似资料:
  • 我们用Storm和Kafka喷口。当我们的消息失败时,我们希望重播它们,但在某些情况下,错误的数据或代码错误将导致消息总是失败一个螺栓,所以我们将进入无限的重播循环。很明显,当我们发现错误时,我们正在修复错误,但希望我们的拓扑具有一般的容错性。在一个元组被重播了N次以上之后,我们如何能够ack()? 通过查看Kafka Spout的代码,我看到它被设计为使用指数回退计时器重试,并对PR状态进行注释

  • 由于,我检查了一个spark作业的输出拼花文件,该作业总是会发出声音。我在Cloudera 5.13.1上使用了 我注意到拼花地板排的大小是不均匀的。第一排和最后一排的人很多。剩下的真的很小。。。 拼花地板工具的缩短输出,: 这是已知的臭虫吗?如何在Spark中设置拼花地板块大小(行组大小)? 编辑: Spark应用程序的作用是:它读取一个大的AVRO文件,然后通过两个分区键(使用

  • Snowflake文档指出,VARCHAR列仅限于16 MB未压缩的https://docs.Snowflake.net/manuals/sql-reference/data-types-text.html#data-types-for-text-strings Snowflake文档指出,VARCHAR数据会自动转换为JavaScript字符串数据类型。 https://docs.Snowfla

  • 我有一个WordPress网络站点(目前只有一个页面)。最大的问题是,WordPress中媒体的最大上传大小限制为1MB。 我增加这一限制的任何尝试都没有成功。 到目前为止,我所尝试的: 增加upload_max_filesize和post_max_size 服务器重新加载php。ini在特定时间段内自动生成。通过运行phpinfo(),php确认了该更改。 WordPress插件(Revo Sl

  • 我在研究Euler项目的问题,这是问题五: 最大素因子问题3 13195的素因子为5、7、13和29。 600851475143的最大质因数是什么? 我得到了工作代码: 因数(19*19*19*19*19*19*19*19*19*1999989899) x=33170854034208712,最后一个系数=182128674 33170854034208712 有人知道为什么这没有得到正确的答案吗

  • 关于雪花的新功能--推断模式表函数,我有一个问题。INFER模式函数在parquet文件上执行得很好,并返回正确的数据类型。但是,当parquet文件被分区并存储在S3中时,INFER模式的功能与pyspark Dataframes不同。 在DataFrames中,分区文件夹名称和值作为最后一列读取;在雪花推断模式中有没有一种方法可以达到同样的结果? 示例: 示例:{“AGMT_GID”:1714