5.3.2.2-Storm-Java-API
优质
小牛编辑
136浏览
2023-12-01
方法名 | 概述 | 参数 | 返回值 | 示例 |
---|---|---|---|---|
open(Map conf, TopologyContext context,SpoutOutputCollector collector) | 该方法将在所有任务开始前被执行,一般作数据的初始化操作 | conf为拓扑提交时配置的变量,context为拓扑执行的上下文环境,collector用于在组件间传递数据 | 无 | |
nextTuple() | 方法在相同的循环中被周期性的调用,用于处理实际的业务逻辑。 | 无 | 无 | |
ack(Object msgId) | 当发送出的数据被正确执行后,会自动调用该方法。 | msgId为被正确执行的元组id | 无 | |
fail(Object msgId) | 当发送出的数据没有并正确执行(比如丢失了),会自动调用该方法。 | msgId为被未正确执行的元组id | 无 |
public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {
try {
this.context = context;
//通过配置文件确定数据源
this.fileReader = new FileReader(conf.get("wordsFile").toString());
} catch (FileNotFoundException e) {
throw new RuntimeException("Error reading file
["+conf.get("wordFile")+"]");
}
this.collector = collector;
}
public void nextTuple() {
String str;
BufferedReader reader = new BufferedReader(fileReader);
try{
//从数据源获取数据,并将该数据发往下一个处理组件
while((str = reader.readLine()) != null){
this.collector.emit(new Values(str));
}
}catch(Exception e){
throw new RuntimeException("Error reading tuple",e);
}
}
public void ack(Object msgId) {
System.out.println("OK:"+msgId);
}
public void fail(Object msgId) {
System.out.println("FAIL:"+msgId);
}
1. Blot 相关API
方法名 | 概述 | 参数 | 返回值 | 示例 |
---|---|---|---|---|
prepare(Map conf, TopologyContext context,OutputCollector collector) | 该方法将在所有任务开始前被执行,一般作数据的初始化操作 | conf为拓扑提交时配置的变量,context为拓扑执行的上下文环境,collector用于在组件间传递数据 | 无 | |
execute(Tuple input) | 方法在相同的循环中被周期性的调用,用于处理实际的业务逻辑。 | 无 | 无 | |
declareOutputFields(OutputFieldsDeclarer declarer) | 声明该组件处理后流中有哪些字段。 | declarer为声明类 | 无 | |
fail(Object msgId) | 当发送出的数据没有并正确执行(比如丢失了),会自动调用该方法。 | msgId为被未正确执行的元组id | 无 |
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple input) {
String sentence = input.getString(0);
//解析上游接收的数据元组
String[] words = sentence.split(" ");
//将数据切割后逐一处理
for(String word : words){
word = word.trim();
if(!word.isEmpty()){
word = word.toLowerCase();
//向下游发送数据
List a = new ArrayList();
a.add(input);
collector.emit(a,new Values(word));
}
}
// 通知上游组件该元组已正确执行
collector.ack(input);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//声明该组件输出流中的字段为“word”
declarer.declare(new Fields("word"));
}
3. Topology相关API
方法名 | 概述 | 参数 | 返回值 | 示例 |
---|---|---|---|---|
TopologyBuilder() | 用于组建实际集群运行的topology | 无 | TopologyBuilder对象 | |
Config() | 用于建立包含topology配置的Config对象,该配置在运行时会被与集群的配置合并并且通过prepare方法发送到所有结点。 | 无 | Config对象 | |
LocalCluster() | 通过构造该对象,可在本地环境中模拟Storm的运行环境进行代码功能测试。 | 无 | LocalCluster对象 | |
StormSubmitter.submitTopologyWithProgressBar(name, conf, builder) | 通过构造该对象,实现向集群提交topology的功能。 | name为topology的名称,conf为配置信息,builder为设计的topology | 无 |
public static void main(String[] args) throws InterruptedException {
//TopologyBuilder对象初始化
……
//建立包含topology配置的Config对象
Config conf = new Config();
conf.put("wordsFile", args[0]);
conf.setDebug(true);
//….
}
public static void main(String[] args) throws InterruptedException {
//TopologyBuilder对象初始化
……
//建立包含topology配置的Config对象
……
//创建本地运行模式以模拟测试
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
Thread.sleep(2000);
cluster.shutdown();.
}
public static void main(String[] args) throws InterruptedException {
//TopologyBuilder对象初始化
……
//建立包含topology配置的Config对象
……
//向集群提交拓扑
StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
}
4. Grouping相关API
方法名 | 概述 | 参数 | 返回值 | 示例 |
---|---|---|---|---|
shuffleGrouping(String name) | Shuffle 分组是最常用的分组方式。它使用一个参数(源组件),源组件会发射元组到一个随机选择的 bolt 并确保每个消费者会收到等数量的元组。 | name为上游需要分组的组件名称 | 无 | |
fieldsGrouping(String name, Fields fields) | Fields分组允许你基于元组的一个或多个域来控制元组怎样被发送到 bolts。 它确保一个联合域中给定的值集合总是会被送到相同的bolt。 | Name为上游需要分组的组件名称,fields为字段集合。 | 无 | |
allGrouping(String name) | All 分组发送每个元组的一份单独拷贝到接收 bolt 的所有实例上。这种分组被用来向 bolts发送信号。 | name为上游需要分组的组件名称 | 无 | |
directGrouping(String name) | 这是一个由源决定哪个组件将接收元组的分组。 | Name为上游需要分组的组件名称 | 无 |
public static void main(String[] args) throws InterruptedException {
……
builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
……
}
public static void main(String[] args) throws InterruptedException {
……
builder.setBolt("word-counter", new WordCounter(),2).fieldsGrouping("word-normalizer", new Fields("word"));
……
}
public static void main(String[] args) throws InterruptedException {
……
builder.setBolt("word-counter", new WordCounter(),2).allGrouping("signals-spout","signals")
……
}
public static void main(String[] args) throws InterruptedException {
……
builder.setBolt("word-counter", new WordCounter(),2).directGrouping("word-normalizer");
……
}
5. DPRC 拓扑 API
方法名 | 概述 | 参数 | 返回值 | 示例 |
---|---|---|---|---|
LinearDRPCTopologyBuilder(String name) | 构建的topology创建DRPCSpouts---它连接DRPC服务器并且发送数据到topology的剩余部分---topology还包装bolts,这使得结果可以从最后一个bolt返回。所有添加到LinearDRPCTopologyBuilder上的bolts被顺序执行。 | name为对外暴漏的drpc标识 | LinearDRPCTopologyBuilder对象 |
public static void main(String[] args) {
LocalDRPC drpc = new LocalDRPC();
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("add");
builder.addBolt(new AdderBolt(),2);
Config conf = new Config();
conf.setDebug(true);
LocalCluster cluster = new LocalCluster();
//构建drpc拓扑
cluster.submitTopology("drpc-adder-topology", conf,
builder.createLocalTopology(drpc));
//执行drpc,获取结果
String result = drpc.execute("add", "1+-1");
//判断结果是否正确
checkResult(result,0);
result = drpc.execute("add", "1+1+5+10");
checkResult(result,17);
cluster.shutdown();
drpc.shutdown();
}
6. storm-kafka接口
方法名 | 概述 | 参数 | 返回值 |
---|---|---|---|
kafkaSpout(SpoutConfig kafkaConfig) | 使用KafkaConfig来配置一些与kafka自身相关的选项。 | kafkaConfig为kafka的一些配置 | 无 |
public static void main(String[] args){
…
SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, “topic1”);
kafkaConfig.zkRoot = offsetZkRoot;
kafkaConfig.id = offsetZkId;
KafkaSpout spout = new KafkaSpout(kafkaConfig);
…
}