当前位置: 首页 > 知识库问答 >
问题:

将字数拓扑提交到我的storm集群,使用Eclipse创建jar,但它显示异常

宋瀚海
2023-03-14

我正在尝试向我的storm群集提交字数拓扑。我使用Eclipse创建了一个jar,但它显示了异常。

谁能告诉我该怎么办。我在这里附上我的代码和例外。

喷口创作-

public class WordReader implements IRichSpout {
    private SpoutOutputCollector collecter;
    private BufferedReader bufferedreader;
    private FileReader filereader;
    private Boolean completed=false;
    private TopologyContext context;
    private final static Logger logger=LoggerFactory.getLogger(WordReader.class);
    @Override
    public void ack(Object msgId) {
        // TODO Auto-generated method stub
        System.out.println("Ok"+msgId);
    }
    @Override
    public void activate() {
        // TODO Auto-generated method stub
        logger.info("Activating Storm");        
    }
    @Override
    public void fail(Object msgId) {
        // TODO Auto-generated method stub
        System.out.println("Fail"+msgId);   
    }
    @Override
    public void nextTuple() {
        // TODO Auto-generated method stub
        if(completed)
        {
            try
            {
                Thread.sleep(1000);
            }
            catch(InterruptedException e)
            {
                System.out.println("String is Interrupted");
            }
        }
        String line;
        bufferedreader=new BufferedReader(filereader);
        try {
            while((line=bufferedreader.readLine())!= null)
            {
                this.collecter.emit(new Values(line));
            }
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        finally
        {
            completed=true;
        }   
    }
    @Override
    public void open(Map map, TopologyContext context, SpoutOutputCollector collector) {
        // TODO Auto-generated method stub
        this.context=context;
        try {
            this.filereader=new FileReader(map.get("InputFile").toString());
        } catch (FileNotFoundException e) {
            // TODO Auto-generated catch block
            throw new RuntimeException("Error reading file");
        }
            this.collecter=collector;}
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub
        declarer.declare(new Fields("line"));
    }
}

螺栓代码-

public class WordNormalizer implements IRichBolt{
    private OutputCollector collecter;
    @Override
    public void cleanup() {
        // TODO Auto-generated method stub
    }
    @Override
    public void execute(Tuple input) {
        // TODO Auto-generated method stub
        String sentence=input.getStringByField("line");
        String[] words=sentence.split(" ");
        for(String word:words)
        {
            word=word.trim();
            if(!word.isEmpty())
            {
                word=word.toLowerCase();
                ArrayList a=new ArrayList();
                a.add(input);
                this.collecter.emit(a,new Values(word));
            }
            collecter.ack(input);
        }   
    }
    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            this.collecter=collecter;
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub  
        declarer.declare(new Fields("word"));
    }
}

螺栓计数频率-

public class WordCountBolt implements IRichBolt {
    private OutputCollector collector;
    private Integer id;
    private String name;
    private Map<String, Integer> counter;
    @Override
    public void cleanup() {
        // TODO Auto-generated method stub
        for(Map.Entry<String, Integer> entry : counter.entrySet())
        {
            System.out.println(entry.getKey()+" "+entry.getValue());
        }

    }
    @Override
    public void execute(Tuple input) {
        // TODO Auto-generated method stub
        String str=input.getStringByField("word");
        if(!counter.containsKey(str))
        {
            counter.put(str, 1);
        }
        else
        {
            Integer i=counter.get(str)+1;
            counter.put(str, i);
        }
    }
    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        // TODO Auto-generated method stub
        this.counter=new HashMap<String, Integer>();
        this.collector=collector;
        this.name=context.getThisComponentId();
        this.id=context.getThisTaskId();
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {}
}

主类创建拓扑-

public class StormMain {
    public static void main(String[] args)
    {
        //Configuration
        Config conf = new Config();
        conf.put("InputFile",args[0]);
        conf.setDebug(false);
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("word-reader",new WordReader());
        builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
        builder.setBolt("word-counter", new WordCountBolt()).shuffleGrouping("word-normalizer");
        //Topology run
        conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,1);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("TopologyMain",conf,builder.createTopology());
        //Thread.sleep(1000);
        //cluster.killTopology("TopologyMain");
        cluster.shutdown();
    }
}

编辑异常

这是我得到的一个例外:

     org.apache.storm.zookeeper.server.NIOServerCnxn - caught end of stream exception
org.apache.storm.zookeeper.server.ServerCnxn$EndOfStreamException: Unable to read additional data from client sessionid 0x14cb812ae720003, likely client has closed socket
    at org.apache.storm.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) ~[storm-core-0.9.3.jar:0.9.3]
    at org.apache.storm.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) [storm-core-0.9.3.jar:0.9.3]
    at java.lang.Thread.run(Unknown Source) [na:1.7.0_71]
6056 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /0:0:0:0:0:0:0:1:55127 which had sessionid 0x14cb812ae720003
6057 [ProcessThread(sid:0 cport:-1):] INFO  org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x14cb812ae720005
6076 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Session: 0x14cb812ae720005 closed
6076 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /127.0.0.1:55133 which had sessionid 0x14cb812ae720005
6076 [main-EventThread] INFO  org.apache.storm.zookeeper.ClientCnxn - EventThread shut down
6076 [main] INFO  backtype.storm.daemon.supervisor - Shutting down supervisor 3b4d74c2-9fa3-4b8d-beb8-419063c95c02
6077 [Thread-3] INFO  backtype.storm.event - Event manager interrupted
6077 [Thread-4] INFO  backtype.storm.event - Event manager interrupted
6078 [ProcessThread(sid:0 cport:-1):] INFO  org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x14cb812ae720007
6097 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /127.0.0.1:55139 which had sessionid 0x14cb812ae720007
6097 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Session: 0x14cb812ae720007 closed
6097 [main-EventThread] INFO  org.apache.storm.zookeeper.ClientCnxn - EventThread shut down
6098 [main] INFO  backtype.storm.daemon.supervisor - Shutting down supervisor e94ee8a8-f38f-4ba4-a48f-4427a7c8d30d
6098 [Thread-5] INFO  backtype.storm.event - Event manager interrupted
6098 [Thread-6] INFO  backtype.storm.event - Event manager interrupted
6099 [ProcessThread(sid:0 cport:-1):] INFO  org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x14cb812ae720009
6117 [main] INFO  org.apache.storm.zookeeper.ZooKeeper - Session: 0x14cb812ae720009 closed
6117 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /0:0:0:0:0:0:0:1:55145 which had sessionid 0x14cb812ae720009
6118 [main-EventThread] INFO  org.apache.storm.zookeeper.ClientCnxn - EventThread shut down
6118 [main] INFO  backtype.storm.testing - Shutting down in process zookeeper
6118 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  org.apache.storm.zookeeper.server.NIOServerCnxnFactory - NIOServerCnxn factory exited run method
6119 [main] INFO  org.apache.storm.zookeeper.server.ZooKeeperServer - shutting down
6119 [main] INFO  org.apache.storm.zookeeper.server.SessionTrackerImpl - Shutting down
6119 [main] INFO  org.apache.storm.zookeeper.server.PrepRequestProcessor - Shutting down
6119 [main] INFO  org.apache.storm.zookeeper.server.SyncRequestProcessor - Shutting down
6119 [ProcessThread(sid:0 cport:-1):] INFO  org.apache.storm.zookeeper.server.PrepRequestProcessor - PrepRequestProcessor exited loop!
6119 [SyncThread:0] INFO  org.apache.storm.zookeeper.server.SyncRequestProcessor - SyncRequestProcessor exited!
6119 [main] INFO  org.apache.storm.zookeeper.server.FinalRequestProcessor - shutdown of request processor complete
6120 [main] INFO  backtype.storm.testing - Done shutting down in process zookeeper
6120 [main] INFO  backtype.storm.testing - Deleting temporary path C:\Users\Rishi\AppData\Local\Temp\8335008e-119b-4ae3-a557-2839d573a579
6128 [main] INFO  backtype.storm.testing - Deleting temporary path C:\Users\Rishi\AppData\Local\Temp\8c53a710-6448-441e-bf01-734b80f9b989
6130 [main] INFO  backtype.storm.testing - Unable to delete file: C:\Users\Rishi\AppData\Local\Temp\8c53a710-6448-441e-bf01-734b80f9b989\version-2\log.1
6130 [main] INFO  backtype.storm.testing - Deleting temporary path C:\Users\Rishi\AppData\Local\Temp\1ea20791-599d-483d-9ffd-37445005684c
6136 [main] INFO  backtype.storm.testing - Deleting temporary path C:\Users\Rishi\AppData\Local\Temp\ef85048c-77e3-4392-8fc1-41bb4547ab53
8027 [SessionTracker] INFO  org.apache.storm.zookeeper.server.SessionTrackerImpl - SessionTrackerImpl exited loop!

共有3个答案

孔志强
2023-03-14

我们必须使用StormSubmitter来提交拓扑,而不是使用LocalCluster。。。。。我使用了它,它工作正常,现在我在Storm UI上有了我的拓扑结构

cluster.submitTopology("TopologyMain",conf,builder.createTopology());

换成--

StormSubmitter.submitTopology("TopologyMain",conf,builder.createTopology());
李烨
2023-03-14

只需增加线程Hibernate的时间长度。

Thread.sleep(10000);

设为10000,然后检查。这是因为你的地图任务花费了很多时间。

段干靖
2023-03-14

在运行程序时遇到了一个非常类似的问题:

[SessionTracker] INFO  org.apache.storm.zookeeper.server.SessionTrackerImpl - SessionTrackerImpl exited loop!

通过删除

cluster.shutdown();    

因为它不允许动物园管理员和暴风雪联系。

 类似资料:
  • 我正在尝试使用Eclipse在Linux中运行Storm启动示例。我收到以下错误和函数从未被调用。 错误: 我的拓扑类: 我正在虚拟机环境中工作,所以不知道这是否是由于安装了Zookeeper。有什么想法吗?

  • 问题内容: 我在使用IDE向生产集群提交拓扑时遇到了一个问题,而如果我在命令行中使用command 执行同样的事情,它的运行就像天堂一样。我从githublink看到了同样的例子。 为了提交拓扑,我正在使用这些行集 请建议我这是否是运行的正确方法? 问题答案: 很好找到解决方案。当我们运行“ storm jar”时,它将在提交的jar中触发storm.jar的属性标志。因此,如果我们要以编程方式提

  • 我读了很多和Storm有关的网站。但我仍然无法将拓扑结构完美地映射到Storm集群中。 请帮助我理解这一点。 在Storm集群中有这样的术语 null null null 所有这些都要用Storm集群来映射。我已经在一个项目里工作了。所以我知道拓扑结构。

  • 当我站在上面提交拓扑时,我在storm jar中遇到了问题 我写 打出来的 没有找到Storm指令 我想知道怎么了? Storm罐之间的区别是什么。。。。和 都在提交拓扑?

  • 我一直在使用IDE(Eclipse)测试Storm拓扑的远程提交。我成功地将简单的storm拓扑上传到了远程storm集群,但奇怪的是,当我检查storm UI以确定远程提交的拓扑是否正常工作时,我在UI中看到了just_acker bolt,但其他bolt和spout却不在那里。之后,我从命令行手动提交了拓扑,并再次检查了Storm UI,它正常工作,没有问题。我一直在找原因,但没有找到。我在下

  • 这是我尝试运行的基本wordcount拓扑。但我收到的错误是“信息组织”。阿帕奇。暴风雨动物园管理员。服务器SessionTrackerImpl-SessionTrackerImpl已退出循环!'。有人能帮我吗?? 当我移除集群时。shutdown(),在我按下cntrl c之前,tweets会一直出现。wordcount也不会显示##