我们是新来的风暴。我们不知道如何创建拓扑,请帮助我们应对风暴。我们尝试了“ Windows上的狂风暴雨”一文中给出的示例wordcount c =
topology。但是我们无法理解如何给出输入,以及风暴用户界面中输入存在的位置以及输出存在的位置。
输入和输出在Storm UI中不存在。在Storm
UI中,您看不到发出的元组,处理时间,集群配置和集群的运行状况。要查看输出和输入,请使用记录器机制,然后检查其中存在的每个工作日志文件风暴软件包的日志文件夹。要在Storm中创建拓扑,您需要两件事:一个喷嘴和一个螺栓。请在下面的示例代码中找到:-
SampleSpout.java
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
public class SampleSpout implements IRichSpout{
SpoutOutputCollector collector;
int i=0;
List<Object> tupleList;
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
// TODO Auto-generated method stub
}
@Override
public void close() {
// TODO Auto-generated method stub
}
@Override
public void activate() {
// TODO Auto-generated method stub
}
@Override
public void deactivate() {
// TODO Auto-generated method stub
}
@Override
public void nextTuple() {
tupleList=new ArrayList<Object>();
tupleList.add("storm"+i);
tupleList.add(i);
collector.emit(tupleList,i);
i++;
}
@Override
public void ack(Object msgId) {
// TODO Auto-generated method stub
}
@Override
public void fail(Object msgId) {
// TODO Auto-generated method stub
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","count"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}
SampleBolt.java
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
public class SampleBolt implements IBasicBolt {
private static Logger log = LoggerFactory.getLogger(SampleBolt.class);
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
@Override
public void prepare(Map stormConf, TopologyContext context) {
// TODO Auto-generated method stub
}
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
log.info(input.getValues().toString()+"output values");
}
@Override
public void cleanup() {
// TODO Auto-generated method stub
}
}
SampleTopology.java
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
public class SampleTopology {
/**
* @param args
*/
public static void main(String[] args) {
TopologyBuilder topology=new TopologyBuilder();
topology.setSpout("sampleSpout",new SampleSpout());
topology.setBolt("sampleBolt",new SampleBolt()).shuffleGrouping("sampleSpout");
Config conf = new Config();
conf.setDebug(true);
LocalCluster cluster=new LocalCluster();
cluster.submitTopology("test", conf, topology.createTopology());
}
}
我正在尝试使用Eclipse在Linux中运行Storm启动示例。我收到以下错误和函数从未被调用。 错误: 我的拓扑类: 我正在虚拟机环境中工作,所以不知道这是否是由于安装了Zookeeper。有什么想法吗?
现在我想在一个污点中使用Drools,它在LocalCluster中正常工作,但是当我把它放在生产集群中时,它有错误。污点是: 我使用官方文件创建了kiesession。误差为: 也许有些东西没有初始化。但当blot执行时,我创建了一个新的kieservice。有人能帮我吗 谢啦!
我刚来暴风,所以温柔点:-) 什么是实现这一目标的最佳方式?
关于拓扑结构的说明: 喷口连续向读取螺栓发送元组。 读取bolt过程并将结果发送给下一个bolt等等。 在R bolt中处理tuple1之后还是在readbolt发送tuple1写入bolt之后?
我对storm使用Deubug是新手 我强制在 但是,当我完成了对拓扑的搜索时,找不到调试的结果在哪里 我注意到在,! 为什么它看不懂我的变化?
8台机器一直在使用。每一个都有22个核心和512 GB的RAM。但是,我们的代码运行得真的很慢。传输600万个数据需要10分钟才能完成。 60个文件中的10 MB在一秒钟内传输到HDFS。我们正在努力优化我们的代码,但很明显我们做了一些非常错误的事情。 对于蜂巢表,我们有64个桶。 在HDFS喷口;.setmaxextending(50000); 在蜂巢喷口选项;.WithTxNsperBatch