我正在尝试向我的storm群集提交字数拓扑。我使用Eclipse创建了一个jar,但它显示了异常。
谁能告诉我该怎么办。我在这里附上我的代码和例外。
喷口创作-
public class WordReader implements IRichSpout {
private SpoutOutputCollector collecter;
private BufferedReader bufferedreader;
private FileReader filereader;
private Boolean completed=false;
private TopologyContext context;
private final static Logger logger=LoggerFactory.getLogger(WordReader.class);
@Override
public void ack(Object msgId) {
// TODO Auto-generated method stub
System.out.println("Ok"+msgId);
}
@Override
public void activate() {
// TODO Auto-generated method stub
logger.info("Activating Storm");
}
@Override
public void fail(Object msgId) {
// TODO Auto-generated method stub
System.out.println("Fail"+msgId);
}
@Override
public void nextTuple() {
// TODO Auto-generated method stub
if(completed)
{
try
{
Thread.sleep(1000);
}
catch(InterruptedException e)
{
System.out.println("String is Interrupted");
}
}
String line;
bufferedreader=new BufferedReader(filereader);
try {
while((line=bufferedreader.readLine())!= null)
{
this.collecter.emit(new Values(line));
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
finally
{
completed=true;
}
}
@Override
public void open(Map map, TopologyContext context, SpoutOutputCollector collector) {
// TODO Auto-generated method stub
this.context=context;
try {
this.filereader=new FileReader(map.get("InputFile").toString());
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
throw new RuntimeException("Error reading file");
}
this.collecter=collector;}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
declarer.declare(new Fields("line"));
}
}
螺栓代码-
public class WordNormalizer implements IRichBolt{
private OutputCollector collecter;
@Override
public void cleanup() {
// TODO Auto-generated method stub
}
@Override
public void execute(Tuple input) {
// TODO Auto-generated method stub
String sentence=input.getStringByField("line");
String[] words=sentence.split(" ");
for(String word:words)
{
word=word.trim();
if(!word.isEmpty())
{
word=word.toLowerCase();
ArrayList a=new ArrayList();
a.add(input);
this.collecter.emit(a,new Values(word));
}
collecter.ack(input);
}
}
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collecter=collecter;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
declarer.declare(new Fields("word"));
}
}
螺栓计数频率-
public class WordCountBolt implements IRichBolt {
private OutputCollector collector;
private Integer id;
private String name;
private Map<String, Integer> counter;
@Override
public void cleanup() {
// TODO Auto-generated method stub
for(Map.Entry<String, Integer> entry : counter.entrySet())
{
System.out.println(entry.getKey()+" "+entry.getValue());
}
}
@Override
public void execute(Tuple input) {
// TODO Auto-generated method stub
String str=input.getStringByField("word");
if(!counter.containsKey(str))
{
counter.put(str, 1);
}
else
{
Integer i=counter.get(str)+1;
counter.put(str, i);
}
}
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
// TODO Auto-generated method stub
this.counter=new HashMap<String, Integer>();
this.collector=collector;
this.name=context.getThisComponentId();
this.id=context.getThisTaskId();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {}
}
主类创建拓扑-
public class StormMain {
public static void main(String[] args)
{
//Configuration
Config conf = new Config();
conf.put("InputFile",args[0]);
conf.setDebug(false);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader",new WordReader());
builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
builder.setBolt("word-counter", new WordCountBolt()).shuffleGrouping("word-normalizer");
//Topology run
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("TopologyMain",conf,builder.createTopology());
//Thread.sleep(1000);
//cluster.killTopology("TopologyMain");
cluster.shutdown();
}
}
编辑异常
这是我得到的一个例外:
org.apache.storm.zookeeper.server.NIOServerCnxn - caught end of stream exception
org.apache.storm.zookeeper.server.ServerCnxn$EndOfStreamException: Unable to read additional data from client sessionid 0x14cb812ae720003, likely client has closed socket
at org.apache.storm.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) ~[storm-core-0.9.3.jar:0.9.3]
at org.apache.storm.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) [storm-core-0.9.3.jar:0.9.3]
at java.lang.Thread.run(Unknown Source) [na:1.7.0_71]
6056 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /0:0:0:0:0:0:0:1:55127 which had sessionid 0x14cb812ae720003
6057 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x14cb812ae720005
6076 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Session: 0x14cb812ae720005 closed
6076 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /127.0.0.1:55133 which had sessionid 0x14cb812ae720005
6076 [main-EventThread] INFO org.apache.storm.zookeeper.ClientCnxn - EventThread shut down
6076 [main] INFO backtype.storm.daemon.supervisor - Shutting down supervisor 3b4d74c2-9fa3-4b8d-beb8-419063c95c02
6077 [Thread-3] INFO backtype.storm.event - Event manager interrupted
6077 [Thread-4] INFO backtype.storm.event - Event manager interrupted
6078 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x14cb812ae720007
6097 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /127.0.0.1:55139 which had sessionid 0x14cb812ae720007
6097 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Session: 0x14cb812ae720007 closed
6097 [main-EventThread] INFO org.apache.storm.zookeeper.ClientCnxn - EventThread shut down
6098 [main] INFO backtype.storm.daemon.supervisor - Shutting down supervisor e94ee8a8-f38f-4ba4-a48f-4427a7c8d30d
6098 [Thread-5] INFO backtype.storm.event - Event manager interrupted
6098 [Thread-6] INFO backtype.storm.event - Event manager interrupted
6099 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x14cb812ae720009
6117 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Session: 0x14cb812ae720009 closed
6117 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /0:0:0:0:0:0:0:1:55145 which had sessionid 0x14cb812ae720009
6118 [main-EventThread] INFO org.apache.storm.zookeeper.ClientCnxn - EventThread shut down
6118 [main] INFO backtype.storm.testing - Shutting down in process zookeeper
6118 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxnFactory - NIOServerCnxn factory exited run method
6119 [main] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - shutting down
6119 [main] INFO org.apache.storm.zookeeper.server.SessionTrackerImpl - Shutting down
6119 [main] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Shutting down
6119 [main] INFO org.apache.storm.zookeeper.server.SyncRequestProcessor - Shutting down
6119 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - PrepRequestProcessor exited loop!
6119 [SyncThread:0] INFO org.apache.storm.zookeeper.server.SyncRequestProcessor - SyncRequestProcessor exited!
6119 [main] INFO org.apache.storm.zookeeper.server.FinalRequestProcessor - shutdown of request processor complete
6120 [main] INFO backtype.storm.testing - Done shutting down in process zookeeper
6120 [main] INFO backtype.storm.testing - Deleting temporary path C:\Users\Rishi\AppData\Local\Temp\8335008e-119b-4ae3-a557-2839d573a579
6128 [main] INFO backtype.storm.testing - Deleting temporary path C:\Users\Rishi\AppData\Local\Temp\8c53a710-6448-441e-bf01-734b80f9b989
6130 [main] INFO backtype.storm.testing - Unable to delete file: C:\Users\Rishi\AppData\Local\Temp\8c53a710-6448-441e-bf01-734b80f9b989\version-2\log.1
6130 [main] INFO backtype.storm.testing - Deleting temporary path C:\Users\Rishi\AppData\Local\Temp\1ea20791-599d-483d-9ffd-37445005684c
6136 [main] INFO backtype.storm.testing - Deleting temporary path C:\Users\Rishi\AppData\Local\Temp\ef85048c-77e3-4392-8fc1-41bb4547ab53
8027 [SessionTracker] INFO org.apache.storm.zookeeper.server.SessionTrackerImpl - SessionTrackerImpl exited loop!
我们必须使用StormSubmitter来提交拓扑,而不是使用LocalCluster。。。。。我使用了它,它工作正常,现在我在Storm UI上有了我的拓扑结构
cluster.submitTopology("TopologyMain",conf,builder.createTopology());
换成--
StormSubmitter.submitTopology("TopologyMain",conf,builder.createTopology());
只需增加线程Hibernate的时间长度。
Thread.sleep(10000);
设为10000,然后检查。这是因为你的地图任务花费了很多时间。
在运行程序时遇到了一个非常类似的问题:
[SessionTracker] INFO org.apache.storm.zookeeper.server.SessionTrackerImpl - SessionTrackerImpl exited loop!
通过删除
cluster.shutdown();
因为它不允许动物园管理员和暴风雪联系。
我正在尝试使用Eclipse在Linux中运行Storm启动示例。我收到以下错误和函数从未被调用。 错误: 我的拓扑类: 我正在虚拟机环境中工作,所以不知道这是否是由于安装了Zookeeper。有什么想法吗?
问题内容: 我在使用IDE向生产集群提交拓扑时遇到了一个问题,而如果我在命令行中使用command 执行同样的事情,它的运行就像天堂一样。我从githublink看到了同样的例子。 为了提交拓扑,我正在使用这些行集 请建议我这是否是运行的正确方法? 问题答案: 很好找到解决方案。当我们运行“ storm jar”时,它将在提交的jar中触发storm.jar的属性标志。因此,如果我们要以编程方式提
我读了很多和Storm有关的网站。但我仍然无法将拓扑结构完美地映射到Storm集群中。 请帮助我理解这一点。 在Storm集群中有这样的术语 null null null 所有这些都要用Storm集群来映射。我已经在一个项目里工作了。所以我知道拓扑结构。
当我站在上面提交拓扑时,我在storm jar中遇到了问题 我写 打出来的 没有找到Storm指令 我想知道怎么了? Storm罐之间的区别是什么。。。。和 都在提交拓扑?
我一直在使用IDE(Eclipse)测试Storm拓扑的远程提交。我成功地将简单的storm拓扑上传到了远程storm集群,但奇怪的是,当我检查storm UI以确定远程提交的拓扑是否正常工作时,我在UI中看到了just_acker bolt,但其他bolt和spout却不在那里。之后,我从命令行手动提交了拓扑,并再次检查了Storm UI,它正常工作,没有问题。我一直在找原因,但没有找到。我在下
这是我尝试运行的基本wordcount拓扑。但我收到的错误是“信息组织”。阿帕奇。暴风雨动物园管理员。服务器SessionTrackerImpl-SessionTrackerImpl已退出循环!'。有人能帮我吗?? 当我移除集群时。shutdown(),在我按下cntrl c之前,tweets会一直出现。wordcount也不会显示##