当前位置: 首页 > 知识库问答 >
问题:

Apache Storm spout停止从spout发出消息

单嘉泽
2023-03-14
public class Controller implements IRichSpout {

    SpoutOutputCollector _collector;
    Calendar LAST_RUN = null;
    List<ControllerMessage> msgList;

    /**
     * It is to open the spout
     */
      public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
        msgList= new ArrayList<ControllerMessage>();

        MongoIndexingHandler mongoIndexingHandler = new MongoIndexingHandler();
        mongoIndexingHandler.createMongoIndexes();

      }

      /**
       * It executes the next tuple
       */



    @Override
    public void nextTuple() {
           Map<String, Object> logMap = new HashMap<>();
            logMap.put("BEGIN", new Date());

        try {
            TriggerHandler thandler = new TriggerHandler();
            if (msgList.size() == 0) {
                List<ControllerMessage> mList = thandler.getControllerMessage(new Date());
                msgList = mList;
            }

            if (msgList.size() > 0) {
                ControllerMessage message = msgList.get(0);
                if(thandler.fire(message.getFireTime())) {
                    Util.log(message, "CONTROLLER_LOGS", message.getTime(), new Date());
                    msgList.remove(0);
                    _collector.emit(new Values(message));
                }

            }
            else{
                Utils.sleep(1000);
            }

        } catch (Exception e) {
            _collector.reportError(e);

            Util.exLog(e, "EXECUTOR_ERROR", new Date(), "nextTuple()",Controller.class);
        } 
    }

      /**
       * It acknowledges the messages
       */
      @Override
      public void ack(Object id) {

      }
      /**
       * It tells failed messages
       */
      @Override
      public void fail(Object id) {

      }
     /**
      * It declares the message name
      */
      @Override
      public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("SPOUT_MESSAGE"));
      }

    @Override
    public void activate() {

    }

    @Override
    public void close() {

    }

    @Override
    public void deactivate() {

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }


}
public class DiagnosticTopology {

    public static void main(String[] args) throws Exception {
        int gSize = (null != args && args.length > 0) ? Integer.parseInt(args[0]) : 2;
        int sSize = (null != args && args.length > 1) ? Integer.parseInt(args[1]) : 128;
        int sMSize = (null != args && args.length > 2) ? Integer.parseInt(args[2]) : 16;
        int aGSize = (null != args && args.length > 3) ? Integer.parseInt(args[3]) : 16;
        int rSize = (null != args && args.length > 4) ? Integer.parseInt(args[4]) : 64;
        int rMSize = (null != args && args.length > 5) ? Integer.parseInt(args[5]) : 16;
        int dMSize = (null != args && args.length > 6) ? Integer.parseInt(args[6]) : 8;
        int wSize = (null != args && args.length > 7) ? Integer.parseInt(args[7]) : 16;
        String topologyName = (null != args && args.length > 8) ? args[8] : "DIAGNOSTIC";

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("controller", new Controller(), 1);
        builder.setBolt("generator", new GeneratorBolt(), gSize).shuffleGrouping("controller");
        builder.setBolt("scraping", new ScrapingBolt(), sSize).shuffleGrouping("generator");
        builder.setBolt("smongo", new MongoBolt(), sMSize).shuffleGrouping("scraping");
        builder.setBolt("aggregation", new AggregationBolt(), aGSize).shuffleGrouping("scraping");
        builder.setBolt("rule", new RuleBolt(), rSize).shuffleGrouping("smongo");
        builder.setBolt("rmongo", new RMongoBolt(), rMSize).shuffleGrouping("rule");
        builder.setBolt("dstatus", new DeviceStatusBolt(), dMSize).shuffleGrouping("rule");

        builder.setSpout("trigger", new TriggerSpout(), 1);
        builder.setBolt("job", new JobTriggerBolt(), 4).shuffleGrouping("trigger");

        Config conf = new Config();
        conf.setDebug(false);
        conf.setNumWorkers(wSize);

        StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());
    }
}

暂时还没有答案

 类似资料:
  • 目前,我正在开发一个用于处理原始机器测量数据的storm拓扑。然而,我遇到了无法解释的问题与喷口。 我正在Azure HDInsight上运行一个简单的storm拓扑,用Java编写。事件是从eventhub中读取的,我使用microsoft eventhub spout(版本0.9)。这个eventhub有8个分区,这意味着我也需要EventhUbspout的8个实例。 但是,当我运行拓扑几个小

  • 我正在使用最新版本的google-cloud-pubsub,并且正在经历一个据称已经修复的bug。 我正在使用这个版本和其中的代码示例:https://pypi.org/project/google-cloud-pubsub/ 问题:因此,在我运行呼叫订阅者的订阅者工作者大约4-5小时后,它停止接收消息。 对如何修复它有什么建议吗?

  • 因此,我用于让bot问候新用户的代码停止工作,我不知道为什么或如何使用这是im用于欢迎活动本身的代码```module.exports=(client)=>{const channelId='757493821251649608'//welcome channel const targetChannelId='757521186929246219'//rules and info }``` 这就是

  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认

  • 我的应用程序使用来自RabbitMQ的一些消息并对其进行处理。我有大约10个队列,每个队列最多有10个消费者(线程)。我有5次预回迁。我正在Heroku中使用CloudAMQP插件(RabbitMQ作为服务)运行安装程序。 我使用默认心跳和连接超时设置(60秒)运行。 我的java应用程序是一个使用sping-Rabbit库的Spring Boot应用程序。 版本: 问题是对于一个特定队列的消费者