目录
通过最简单的方法,一分钟构建一个apache storm程序,初探storm原理。在此基础上再去探究strom深层机制,或许更加容易。
本文使用FirstSpout产生数据,Bolt01进行单词切分,Bolt02进行单词统计。
使用IDEA创建Maven项目。步骤File->New->Project。
在POM文件中添加。<scope>provided</scope>如果本地运行需要注释掉,如果提交到storm集群,需要取消注释。
<dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.0.1</version> <!--<scope>provided</scope>--> </dependency> </dependencies>
创建类TestMain,添加main方法
public static void main(String[] args) {
//创建任务拓扑
TopologyBuilder builder = new TopologyBuilder();
//设置拓扑关系,shuffleGrouping随机分组,参数写上一步名字
builder.setSpout("firstSpout",new FirstSpout());
builder.setBolt("bolt01",new Bolt01()).shuffleGrouping("firstSpout");
builder.setBolt("bolt02",new Bolt02()).shuffleGrouping("bolt01");
//启动Topology
Config conf = new Config();
StormTopology topology = builder.createTopology();
if(args != null && args.length > 0) {
try {
//提交到storm集群
StormSubmitter.submitTopology(args[0],conf,topology);
} catch (Exception e) {
e.printStackTrace();
}
}else {
//本地启动
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("storm-test", conf, topology);
}
}
创建类FirstSpout,产生源数据。
public class FirstSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private int num = 0;
//初始化时调用
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.collector = spoutOutputCollector;
}
//业务逻辑,产生源数据
public void nextTuple() {
try {
num++;
this.collector.emit(new Values(num,"First Storm Project"));
System.out.println("spout send: " + num);
Thread.sleep(1000); //间隔1s产生一条数据
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
//定义向下传递的数据格式声明
outputFieldsDeclarer.declare(new Fields("number","message"));
}
}
业务逻辑处理,本文用作单词切分。
public class Bolt01 extends BaseRichBolt {
private OutputCollector collector;
//Bolt启动前执行
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
collector = outputCollector;
}
//数据处理方法
public void execute(Tuple tuple) {
//接收数据
String msg = tuple.getStringByField("message");
System.out.println("Bolt01 accepts:"+ tuple.getInteger(0)+" "+msg);
if (msg!=null && msg.length()>0){
//单词切分
String[] words = msg.split(" ");
//将切分后的单词发送到下一个Bolt
for(String word : words){
this.collector.emit(new Values(word));
}
}
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
//向下传递的数据格式声明
outputFieldsDeclarer.declare(new Fields("bNumber"));
}
}
Bolt获取上一步数据常用方法有两种,以String类型为例,
tuple.getString(0); //获取第一个数据
tuple.getStringByField("message"); //“message”为上层传递的数据格式声明
业务逻辑处理,本文用作单词统计。
public class Bolt02 extends BaseRichBolt {
private OutputCollector collector;
private int sum = 0;
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
collector = outputCollector;
}
public void execute(Tuple tuple) {
String word = tuple.getStringByField("bNumber");
//单词数量统计
if (word !=null && word.length()>0){
sum++;
}
System.out.println("words statistics:"+sum);
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
直接运行main方法,打印结果如下:
spout send 1
Bolt01 accepts:1 My First Project
words statistics:1
words statistics:2
words statistics:3
spout send 2
Bolt01 accepts:2 My First Project
words statistics:4
words statistics:5
words statistics:6
spout send 3
Bolt01 accepts:3 My First Project
words statistics:7
words statistics:8
words statistics:9
spout send 4
Bolt01 accepts:4 My First Project
words statistics:10
words statistics:11
words statistics:12
程序打包,运行mvn package,打出storm-test-1.0-SNAPSHOT.jar包。
jar包上次到服务器。
使用以下命令提交。
storm jar storm-test-1.0-SNAPSHOT.jar com.weichai.TestMain storm-test
本文提供了一种简单的storm程序构建方法,没有涉及多个workers,tasks,以及ack可靠性等。但以本文为基础,进而研究更多storm参数,或许会更加轻松。