5.3.2.2-Storm-Java-API

优质
小牛编辑
136浏览
2023-12-01
方法名概述参数返回值示例
open(Map conf, TopologyContext context,SpoutOutputCollector collector)该方法将在所有任务开始前被执行,一般作数据的初始化操作conf为拓扑提交时配置的变量,context为拓扑执行的上下文环境,collector用于在组件间传递数据
nextTuple()方法在相同的循环中被周期性的调用,用于处理实际的业务逻辑。
ack(Object msgId)当发送出的数据被正确执行后,会自动调用该方法。msgId为被正确执行的元组id
fail(Object msgId)当发送出的数据没有并正确执行(比如丢失了),会自动调用该方法。msgId为被未正确执行的元组id
public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {
try {
this.context = context;
//通过配置文件确定数据源
this.fileReader = new FileReader(conf.get("wordsFile").toString());
} catch (FileNotFoundException e) {
throw new RuntimeException("Error reading file
["+conf.get("wordFile")+"]");
}
this.collector = collector;
}
public void nextTuple() {
String str;
BufferedReader reader = new BufferedReader(fileReader);
try{
//从数据源获取数据,并将该数据发往下一个处理组件
while((str = reader.readLine()) != null){
this.collector.emit(new Values(str));
}
}catch(Exception e){
throw new RuntimeException("Error reading tuple",e);
}
}
public void ack(Object msgId) {
System.out.println("OK:"+msgId);
}
public void fail(Object msgId) {
System.out.println("FAIL:"+msgId);
}

1. Blot 相关API

方法名概述参数返回值示例
prepare(Map conf, TopologyContext context,OutputCollector collector)该方法将在所有任务开始前被执行,一般作数据的初始化操作conf为拓扑提交时配置的变量,context为拓扑执行的上下文环境,collector用于在组件间传递数据
execute(Tuple input)方法在相同的循环中被周期性的调用,用于处理实际的业务逻辑。
declareOutputFields(OutputFieldsDeclarer declarer)声明该组件处理后流中有哪些字段。declarer为声明类
fail(Object msgId)当发送出的数据没有并正确执行(比如丢失了),会自动调用该方法。msgId为被未正确执行的元组id
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple input) {
String sentence = input.getString(0);
//解析上游接收的数据元组
String[] words = sentence.split(" ");
//将数据切割后逐一处理
for(String word : words){
word = word.trim();
if(!word.isEmpty()){
word = word.toLowerCase();
//向下游发送数据
List a = new ArrayList();
a.add(input);
collector.emit(a,new Values(word));
}
}
// 通知上游组件该元组已正确执行
collector.ack(input);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//声明该组件输出流中的字段为“word”
declarer.declare(new Fields("word"));
}

3. Topology相关API

方法名概述参数返回值示例
TopologyBuilder()用于组建实际集群运行的topologyTopologyBuilder对象
Config()用于建立包含topology配置的Config对象,该配置在运行时会被与集群的配置合并并且通过prepare方法发送到所有结点。Config对象
LocalCluster()通过构造该对象,可在本地环境中模拟Storm的运行环境进行代码功能测试。LocalCluster对象
StormSubmitter.submitTopologyWithProgressBar(name, conf, builder)通过构造该对象,实现向集群提交topology的功能。name为topology的名称,conf为配置信息,builder为设计的topology
public static void main(String[] args) throws InterruptedException {
    //TopologyBuilder对象初始化
……
//建立包含topology配置的Config对象
Config conf = new Config();
conf.put("wordsFile", args[0]);
conf.setDebug(true);
//….
}
public static void main(String[] args) throws InterruptedException {
    //TopologyBuilder对象初始化
……
//建立包含topology配置的Config对象
    ……
//创建本地运行模式以模拟测试
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
Thread.sleep(2000);
cluster.shutdown();.
}
public static void main(String[] args) throws InterruptedException {
    //TopologyBuilder对象初始化
……
//建立包含topology配置的Config对象
    ……
//向集群提交拓扑
StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
}

4. Grouping相关API

方法名概述参数返回值示例
shuffleGrouping(String name)Shuffle 分组是最常用的分组方式。它使用一个参数(源组件),源组件会发射元组到一个随机选择的 bolt 并确保每个消费者会收到等数量的元组。name为上游需要分组的组件名称
fieldsGrouping(String name, Fields fields)Fields分组允许你基于元组的一个或多个域来控制元组怎样被发送到 bolts。 它确保一个联合域中给定的值集合总是会被送到相同的bolt。Name为上游需要分组的组件名称,fields为字段集合。
allGrouping(String name)All 分组发送每个元组的一份单独拷贝到接收 bolt 的所有实例上。这种分组被用来向 bolts发送信号。name为上游需要分组的组件名称
directGrouping(String name)这是一个由源决定哪个组件将接收元组的分组。Name为上游需要分组的组件名称
public static void main(String[] args) throws InterruptedException {    
……
builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
……
}
public static void main(String[] args) throws InterruptedException {    
……
builder.setBolt("word-counter", new WordCounter(),2).fieldsGrouping("word-normalizer", new Fields("word"));
……
}
public static void main(String[] args) throws InterruptedException {    
……
builder.setBolt("word-counter", new WordCounter(),2).allGrouping("signals-spout","signals")
……
}
public static void main(String[] args) throws InterruptedException {    
……
builder.setBolt("word-counter", new WordCounter(),2).directGrouping("word-normalizer");
……
}

5. DPRC 拓扑 API

方法名概述参数返回值示例
LinearDRPCTopologyBuilder(String name)构建的topology创建DRPCSpouts---它连接DRPC服务器并且发送数据到topology的剩余部分---topology还包装bolts,这使得结果可以从最后一个bolt返回。所有添加到LinearDRPCTopologyBuilder上的bolts被顺序执行。name为对外暴漏的drpc标识LinearDRPCTopologyBuilder对象
public static void main(String[] args) {
LocalDRPC drpc = new LocalDRPC();
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("add");
builder.addBolt(new AdderBolt(),2);
Config conf = new Config();
conf.setDebug(true);
LocalCluster cluster = new LocalCluster();
//构建drpc拓扑
cluster.submitTopology("drpc-adder-topology", conf,
builder.createLocalTopology(drpc));
//执行drpc,获取结果
String result = drpc.execute("add", "1+-1");
//判断结果是否正确
checkResult(result,0);
result = drpc.execute("add", "1+1+5+10");
checkResult(result,17);
cluster.shutdown();
drpc.shutdown();
}

6. storm-kafka接口

方法名概述参数返回值
kafkaSpout(SpoutConfig kafkaConfig)使用KafkaConfig来配置一些与kafka自身相关的选项。kafkaConfig为kafka的一些配置
public static void main(String[] args){
    …
     SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, “topic1”);
     kafkaConfig.zkRoot = offsetZkRoot;
     kafkaConfig.id = offsetZkId;
     KafkaSpout spout = new KafkaSpout(kafkaConfig);
…
}

参考资料