当前位置: 首页 > 知识库问答 >
问题:

暴风喷口不接住

蒋航
2023-03-14

我已经开始使用storm,所以我使用本教程创建简单的拓扑

public StormTopology build() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(HelloWorldSpout.class.getSimpleName(), 
             helloWorldSpout, spoutParallelism);

        HelloWorldBolt bolt = new  HelloWorldBolt();

        builder.setBolt(HelloWorldBolt.class.getSimpleName(), 
                   bolt, boltParallelism)
              .shuffleGrouping(HelloWorldSpout.class.getSimpleName());
}

我的嘴是这样的

public class HelloWorldSpout  extends BaseRichSpout implements ISpout {
    private SpoutOutputCollector collector;

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("int"));
    }

    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {
        this.collector = collector;
    }

    private static Boolean flag = false;
    public void nextTuple() {
        Utils.sleep(5000);

            //emit only 1 tuple - for testing
        if (!flag){
            this.collector.emit(new Values(6));
            flag = true;
        }
    }

    @Override
    public void ack(Object msgId) {
        System.out.println("[HelloWorldSpout] ack on msgId" + msgId);
    }

    public void fail(Object msgId){
        System.out.println("[HelloWorldSpout] fail on msgId" + msgId);
    }
}

我的螺栓是这样的

@SuppressWarnings("serial")
public class HelloWorldBolt extends BaseRichBolt{
    private OutputCollector collector;

    public void prepare(Map conf, TopologyContext context, 
                    OutputCollector collector) {
        this.collector = collector;
        logger.info("preparing HelloWorldBolt");
    }

    public void execute(Tuple tuple) {
        System.out.println("[HelloWorldBolt] got" + tuple.getInteger(0));
        this.collector.ack(tuple);
    }

    public void cleanup() {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub

    }
}

共有1个答案

鲁弘厚
2023-03-14

spout中的emit()方法只有一个参数,因此不会锚定元组。这就是为什么即使在bolt中处理元组,也不会得到对spout中的ack()方法的回调。

要使其正常工作,您需要修改spout以发出第二个参数,即消息ID。正是这个id被传递回spout中的ack()方法:

public void nextTuple() {
    Utils.sleep(5000);

        //emit only 1 tuple - for testing
    if (!flag){
        Object msgId = "ID 6";  // this can be any object
        this.collector.emit(new Values(6), msgId);
        flag = true;
    }
}


@Override
public void ack(Object msgId) {
    //  msgId should be "ID 6"
    System.out.println("[HelloWorldSpout] ack on msgId" + msgId);
}
 类似资料:
  • 谢谢你抽出时间。 通过在每个分区上使用数据进行泛洪测试,完成读取需要。 再次使用parallelism_hint=1的代码 即 其中, parallelism_hint-是应该分配给执行此spout的任务数。每个任务将在集群周围某个进程的线程上运行。

  • 利用 cocos2d 制作的一款休闲游戏。游戏中有九种不同的卡通水果,您可以拖动整行或整列上的水果,或交换屏幕中两个相邻水果的位置,一条直线上的三个或三个以上的相同水果将会消失,而您将得到分数。当您的分数逐步提高后,将会进入更高难度的关卡,接受更高难度的挑战。 游戏中允许使用一些道具。 [Code4App.com]

  • 当运行两个线程时,对于spout,文件的每一行都读取两次。 我是新手,我想知道处理这件事的最好方法?我可以将线程的数量减少到1个,或者修改spout,使每个线程读取不同的行--或者(如何)我需要使用TopologyContext参数?我不确定我是否错过了一个“Storm”的方式来解决这个问题?

  • 看下这个段伪代码: local value = get_from_cache(key) if not value then value = query_db(sql) set_to_cache(value, timeout = 100) end return value 看上去没有问题,在单元测试情况下,也不会有异常。 但是,进行压力测试的时候,你会发现,每隔 100 秒,数据库的

  • 我试图把Kafka数据通过Storm在hdfs和Hive。我在和HortonWorks合作。因此,我有以下结构,正如在许多教程(http://henning.kropponline.de/2015/01/24/hive-streaming-with-storm/)中看到的那样(稍加修改): 我将kafka-spout数据直接发送到hdfs-bolt,当我只使用hdfs-bolt时,它是工作的。当我

  • 在我的storm拓扑(有2个喷口和1个bolt)中,其中一个kafka喷口使用者的偏移量正在前进,但MSG没有通过kafka喷口发送到bolt。我可以在storm ui中看到,对于那个特定的喷口,发出和传送的消息是0。所以,我的问题是为什么消费者在前进,我可以看到消费者从zookeeper客户端的抵消逐渐增加。