Apache Storm spout停止从spout发出消息

public class Controller implements IRichSpout {

    SpoutOutputCollector _collector;
    Calendar LAST_RUN = null;
    List<ControllerMessage> msgList;

     * It is to open the spout
      public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
        msgList= new ArrayList<ControllerMessage>();

        MongoIndexingHandler mongoIndexingHandler = new MongoIndexingHandler();


       * It executes the next tuple

    public void nextTuple() {
           Map<String, Object> logMap = new HashMap<>();
            logMap.put("BEGIN", new Date());

        try {
            TriggerHandler thandler = new TriggerHandler();
            if (msgList.size() == 0) {
                List<ControllerMessage> mList = thandler.getControllerMessage(new Date());
                msgList = mList;

            if (msgList.size() > 0) {
                ControllerMessage message = msgList.get(0);
                if(thandler.fire(message.getFireTime())) {
                    Util.log(message, "CONTROLLER_LOGS", message.getTime(), new Date());
                    _collector.emit(new Values(message));


        } catch (Exception e) {

            Util.exLog(e, "EXECUTOR_ERROR", new Date(), "nextTuple()",Controller.class);

       * It acknowledges the messages
      public void ack(Object id) {

       * It tells failed messages
      public void fail(Object id) {

      * It declares the message name
      public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("SPOUT_MESSAGE"));

    public void activate() {


    public void close() {


    public void deactivate() {


    public Map<String, Object> getComponentConfiguration() {
        return null;

public class DiagnosticTopology {

    public static void main(String[] args) throws Exception {
        int gSize = (null != args && args.length > 0) ? Integer.parseInt(args[0]) : 2;
        int sSize = (null != args && args.length > 1) ? Integer.parseInt(args[1]) : 128;
        int sMSize = (null != args && args.length > 2) ? Integer.parseInt(args[2]) : 16;
        int aGSize = (null != args && args.length > 3) ? Integer.parseInt(args[3]) : 16;
        int rSize = (null != args && args.length > 4) ? Integer.parseInt(args[4]) : 64;
        int rMSize = (null != args && args.length > 5) ? Integer.parseInt(args[5]) : 16;
        int dMSize = (null != args && args.length > 6) ? Integer.parseInt(args[6]) : 8;
        int wSize = (null != args && args.length > 7) ? Integer.parseInt(args[7]) : 16;
        String topologyName = (null != args && args.length > 8) ? args[8] : "DIAGNOSTIC";

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("controller", new Controller(), 1);
        builder.setBolt("generator", new GeneratorBolt(), gSize).shuffleGrouping("controller");
        builder.setBolt("scraping", new ScrapingBolt(), sSize).shuffleGrouping("generator");
        builder.setBolt("smongo", new MongoBolt(), sMSize).shuffleGrouping("scraping");
        builder.setBolt("aggregation", new AggregationBolt(), aGSize).shuffleGrouping("scraping");
        builder.setBolt("rule", new RuleBolt(), rSize).shuffleGrouping("smongo");
        builder.setBolt("rmongo", new RMongoBolt(), rMSize).shuffleGrouping("rule");
        builder.setBolt("dstatus", new DeviceStatusBolt(), dMSize).shuffleGrouping("rule");

        builder.setSpout("trigger", new TriggerSpout(), 1);
        builder.setBolt("job", new JobTriggerBolt(), 4).shuffleGrouping("trigger");

        Config conf = new Config();

        StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());


