Trident 是 Strom的高级API
TridentTuple interface是Trident能操作的批处理的最小单元 . 每个TridentTuple, 由一组预定义好名称(字段)和类型的值组成. 这些值可以是: byte, character, integer, long, float, double, Boolean 或 byte array. 在构造TridentTopology的时候, 所有操作都是基于tuple的, 这些操作包含增加tuple的字段或者用新的字段替代旧的字段.
所有tuple中的字段有以下的几种访问方式:
function函数中包含了修改原始tuple数据的逻辑. 一个function输入是一个tuple, 输出是一个或多个tuple. function输出的tuple会将输入这个function的tuple合并在一起 . 比如tuple1 = (A,B,C) , function 本身通过逻辑计算出D 则function输出到下一个的tuple2 = (A, B, C, D) .
public class SumFunction extends BaseFunction {
private static final long serialVersionUID = 5L;
public void execute(TridentTuple tuple, TridentCollectorcollector) {
int number1 = tuple.getInteger(0);
int number2 = tuple.getInteger(1);
int sum = number1+number2;
// emit the sum of first two fields
collector.emit(new Values(sum));
}
}
dummyStream.each(new Fields("a","b"), new SumFunction (), new Fields("sum"))
filter可以根据条件决定stream的下一步 , 哪些数据会继续传递下去
public static class CheckEvenSumFilter extends BaseFilter{
private static final long serialVersionUID = 7L;
public boolean isKeep(TridentTuple tuple) {
int number1 = tuple.getInteger(0);
int number2 = tuple.getInteger(1);
int sum = number1+number2;
if(sum % 2 == 0) {
return true;
}
return false;
}
}
dummyStream.each(new Fields("a","b"), new CheckEvenSumFilter ())
相当于select 某字段. 例如当前tuple有x,y,z 三个值, 那么使用下面语句后, 只有x
mystream.project(new Fields("x"))
repartitioning操作 , 可以将tuple分发到多个task上面 . 但是repartitionning操作本身不会改变tuple的任何值
shuffle操作将tuple 平均地且随机地分发到多个task上. 这个通常用在我们希望将计算的负载分布到多个task上面的时候. 实例代码如下, 会分配到两个task上
mystream.shuffle().each(new Fields("a","b"), new myFilter()).parallelismHint(2)
这个操作让我们能够基于某些字段进行分发. 例如我们希望将同一用户名的tuple发到同一个task上面 . 我们可以使用username作为partition的字段.
mystream.partitionBy(new Fields("username")).each(new Fields("username","text"), new myFilter()).parallelismHint(2)
partitionBy操作使用 hash(fields) % (target partition数量) 来计算当前的target partition
这个操作将所有的tuple都到同同一个partition上 .
mystream.global().each(new Fields("a","b"), new Filter()).parallelismHint(2)
这是一类特殊的操作, 这类操作不是repartition所有的tuple , 而是将所有的tuple复制并分发到每一个partition上.
mystream.broadcast().each(new Fields("a","b"), new myFilter()).parallelismHint(2)
这个操作会将当前batch的所有tuple发到同一个partition上.
mystrea.batchGlobal().each(new Fields("a", "b"), new myFilter()).parallelismHint(2)
以上的partition方式都不适用的话, 我们可以定义自己的partition规则(通过实现 backtype.storm.grouping.CustomStreamGrouping 接口). 一下是一个基于country字段来分partition的例子:
pulic class CountryRepartition implements CustomStreamGrouping, Serializable{
private static final long serialVersionUID= 1L;
private static final Map<String, Integer> countries =
ImmutableMap.of(
"India", 0,
"Japan", 1,
"United State", 2,
"China", 3,
"Brazil", 4
);
private int tasks = 0;
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks){
tasks = targetTasks.size();
}
public List<Integer> chooseTasks(int taskId, List<Object> values){
String country = (String) values.get(0);
return ImmutableList.of(countries.get(country) % tasks);
}
}
aggregators 用于对批量输入,或者stream做聚合. 比如, 用户想计算每个batch输入的tuple的总个数, 就可以使用count aggregator来计算. Aggregator接口的实现类, 会将输入完全替换成输出.
Trident中有三类aggregator:
- The partition aggregate 分区聚合
- The aggregate 普通聚合
- The persistence aggregate 持久化聚合
这个聚合函数工作在每个partition上而不是整个batch上. partition aggregate 的输出会完全替代输入. 并且生成只有一个字段的tuple. 以下是使用方法.
mystream.partitionAggregate(new Fileds("x"), new Count(), new Fields("count"))
aggregate 工作在每个batch上. 首先, tuple会默认用global operation来repartitioned. 并且在这个batch上执行aggregate. 一下是示例:
mystream.aggregate(new Fields("x"), new Count(), new Fields("count"))
aggregate 有三种接口, 以下三种接口也可用于 partition aggregate
这个接口首先执行 global repartitionning operation. 然后在每个batch上执行aggregation function.
ReducerAggregator 接口有以下几个方法:
public static class Sum implements ReducerAggregator<Long>{
public static final long seerialVersionUID = 1L;
//return the initial value zero
public Long init(){
return 0L;
}
//遍历输入的tuple, 计算sum值并且声称单字段的一个tuple
public Long reduce(Long curr, TridentTuple tuple){
return curr + tuple.getLong(0);
}
}
Aggregator 接口首先执行 global repartitionning operation. 然后每个batuch上执行aggregator function. 定义上Aggregator和ReduceAggregator非常相似. BaseAggregator 接口包含以下几个方法:
public static class SumAsAggregator extends BaseAggregator<SumAsAggretator.State>{
privat static final long serialVersionUID = 1L;
//state class
static class State{
long count = 0;
}
//初始化state
public State init(Object batchId, TridentCollector collector){
return new State();
}
//将sum的状态写入count变量中
public void aggregate(State state, TridentTuple tridentTuple, TridentCollector tridentCollector){
state.count = tridentTuple.getLong(0) + state.count;
}
//处理完batch的所有tuple后返回单值tuple
public void complete(State state, TridentCollector tridentCollector){
tridentCollector.emit(new Values(state.count))
}
}
CombinerAggregator 接口首先执行 global repartitionning operation. 然后每个batuch上执行aggregator function. 然后在最后一个partition上重新跑aggregator并且返回预期的输出. CombinerAggregator和其它两类来说 , 网络传输较少. 所以总体performance上来说, 会优于Aggregator和ReduceAggregator 接口. 该接口有以下方法:
以下方法实现了Sum功能
public class Sum implements CombinerAggregator<Number>{
private static final long serialVersionUID = 1L;
public Number init(TridentTuple tridentTuple){
return (Number) tridentTuple.getValue(0);
}
public Number combine(Number number1, Number number2){
return Numbers.add(number1, number2);
}
public Number zero(){
return 0;
}
}
这个方法基于所有输入stream的tuple上, 并且将aggregate的结果持久化到state源上(state源可以是 内存, Memcached, Cassandra 或者其他数据库). 例:
mystream.persisentAggregate(new MemoryMapState.Factory(), new Fields("select"), new Count(), new Fields("count"))
Trident提供了API能够在同样的input stream上应用多个aggregator的方法. 这种方法叫做聚合链. 例:
mystream.chainedAgg()
.partitionAggregate(
new Fields("b"),
new Average(),
new Fields("average"))
.partitionAggregate(
new Fields("b"),
new Sum(),
new Fields("sum"))
.chainEnd();
groupBy 操作不包含任何的repartionning . groupBy操作将input stream转换为grouped stream. groupBy操作主要的功能是修改随后的aggregate功能.
在一个非事务型的topology中, 一个spout生成很多批tuple, 但是不保证每个batch内容是什么. 从处理逻辑的角度, 我们可以将pipeline大致分为两类:
例: 以下是一个IBatchSpout的实现:
public class FakeTweetSpout implements IBatchSpout{
private static final long serialVersionUID = 10L;
private int batchSize;
private HashMap<Long, List<List<Object>>> batchesMap = new HashMap<Long, List<List<Object>>>;
public FakeTweetSpout(int batchSize){
this.batchSize = batchSize;
}
private static final Map<Integer, String> TWEET_MAP = new HashMap<Integer, String>()
static {
TWEET_MAP.put(0, "Adidas #FIFA World Cup Chant Challenge ");
TWEET_MAP.put(1, "#FIFA worldcup");
TWEET_MAP.put(2, "#FIFA worldcup");
TWEET_MAP.put(3, "The Great Gatsby is such a good #movie ");
TWEET_MAP.put(4, "#Movie top 10");
}
private static final Map<Integer, String> COUNTRY_MAP = new HashMap<Integer, String>();
static {
COUNTRY_MAP.put(0, "United State");
COUNTRY_MAP.put(1, "Japan");
COUNTRY_MAP.put(2, "India");
COUNTRY_MAP.put(3, "China");
COUNTRY_MAP.put(4, "Brazil")
}
private List<Object> recordGenerator(){
final Random rand = new Random();
int randomNumber = rand.nextInt(5);
int randomNumber2 = rand.nextInt(5);
return new Values(TWEET_MAP.get(randomNumber), COUNTRY_MAP.get(randomNumber2))
}
@Override
public void ack(long batchId){
this.batchesMap.remove(batchId);
}
@Override
public void close(){
//这个方法用来销毁或者关闭所有open方法中打开的连接
}
@Override
public void emitBatch(long batchId, TridentCollector collector){
List<List<Object>> batches = this.batchesMap.get(batchId);
if(batches == null){
batches = new ArrayList<List<Object>>();
for(int i = 0 ; i< this.batchSize; i++){
batches.add(this.redordGenerator());
}
this.batchesMap.put(batchId, batches);
}
for(list<Object> list : batches){
collector.emit(list);
}
}
@Override
public Map getComponentConfiguration(){
//这个方法用来定义配置
return null;
}
@Override
public Fields getOutputFields(){
return new Fields("text", "Country");
}
@Override
public void open(Map arg0, TopologyContext arg1){
//这个方法用于初始化变量, 打开外部资源的连接等
}
}
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.9.0.1</version>
<scope>provided</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>
public class TridentUtility{
//逗号分隔的的字符串作为输入, 用split方法切割字符串, 并返回多个tuple
public static final class Split extends BaseFunction{
private static final long serialVersionUID = 2L;
public void execut(TridentTuple tuple, TridentCollector collector){
String countries = tuple.getString(0);
for(String word: countries.split(",")){
collector.emit(new Values(word));
}
}
}
//这个类扩展了baseFilter, 并且只会返回含#FIFA的文本
public static class TweetFilter extends BaseFilter{
private static final long serialVersionUID = 1L;
public boolean isKeep(TridentTuple tuple){
if(tuple.getString(0).contain("#FIFA")){
return true;
}else{
return false;
}
}
}
public static class Print extends BaseFilter {
private static final long serialVersionUID = 1L;
public boolean isKeep(TridentTuple tuple){
System.out.println(tuple);
return true;
}
}
}
public class TridentHelloWorldTopology{
public static void main(String[] args) throws Exception {
Config conf = new Config();
conf.setMaxSpoutPending(20);
if(args.length == 0){
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Count", conf, buildTopology());
}else{
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, buildTopology());
}
}
public static StormTopology buildTopology(){
FakeTweetSpout spout = new FakeTweetSpout(10);
TridentTopology topology = new TridentTopology();
topology.newStream("faketweetspout", spout)
.shuffle()
.each(new Fields("text", "Country"),
new TridentUtility.TweetFilter())
.groupBy(new Fields("Country"))
.aggregate(new Fields("Country"),
new Count(), new Fields("count"))
.each(new Fields("count"),
new TridentUtility.Print())
parallelismHint(2);
return topology.build();
}
}
trident提供抽象方法 将处理结果 读/写 到有状态的数据源中 . 我们可以将状态保存在 topology的内存中或者可以将状态存处在外部数据源(Memcached 或者 Cassandra)中. 以上介绍的过程中, 我们不能保证该每个tuple处理的原子性 , 那么接下来我们需要按以下的步骤来保证我们的tuple只会被处理一次:
一个Transactional spout有以下特点:
这种Transactional topology, 依赖transaction spout. 假设有时候, 一些分布式队列的节点down了, spout就不能够重构出同样的batch, 直到这些节点恢复状态. 在节点恢复前, 整个pipline都没有处理任何东西. 这导致如果数据输入的源(比如分布式队列)如果不是很稳定的话, Transactional spout 的容错能力会很差. 所幸的是, 像kafka , 通过多节点间复制来保证其本身服务的高可用.
opaque transactional topology 克服了Transactional topology的局限性 . 同时 , opaque 本身是有容错能力的, 哪怕数据源的节点down了, opaque也能保证容错能力 . 它有以下特点:
opaque会多存一个previous 值, 以维护数据库中数据的连续性.
分布式RPC用于查询和检索 正在运行的trident topology. Storm 有一个内置的的分布式RPC 服务器. The distributed RPC 服务器从客户端获取请求并传到topology中. topology处理请求将请求发到 分布式RPC 服务器, 然后再相应到客户端.
可以通过以下storm.yaml文件属性配置 DRPC 服务器:
drpc.servers:
# 这里填的是IP地址
- "nimbus-node"
在nimbus-node这台机器上运行命令 bin/storm drpc 启动DRPC服务器.
例子: 通过drpc检索运行中的topology
public class DistributedRPC{
public static void main(String[] args) trhows Exception{
Config conf = new Config();
conf.setMaxSpoutPending(20);
LocalDRPC drpc = new LocalDRPC();
if(args.length == 0){
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("CountryCount", conf, buildTopology(drpc));
Thread.sleep(2000);
for(int i=0;i< 100 ; i++){
System.out.println(drpc.execute("Count", "Japan, India, Europe"));
Thread.sleep(1000);
}
}else{
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, buildTopology(null));
Thread.sleep(2000);
DRPCClient client = new DRPCClient("DRPC-Server", 1234);
System.out.println(client.execute("Count", "Japan,India,Europe"));
}
}
public static StormTopology buildTopology(LocalDRPC drpc){
FakeTweetSpout spout = new FakeTweetSpout();
TridentTopology topology = new TridentTopology();
TridentState coutryCount = topology
.newStream("spout1", spout)
.shuffle()
.each(new Fields("text","Country"),
new TridentUtility.TweetFilter())
.groupBy(new Fields("Country"))
.persistentAggregate(new MemoryMapState.Factory(),
new Fields("Country"),
new Count(),
new Fields("count"))
.parallelismHint(2);
try{
Thread.sleep(2000);
}catch(InterruptedException e){
}
topology.newDRPCStream("Count", drpc)
.each(new Fields("args"),
new TridentUtility.Split(),
new Fields("Country"))
.stateQuery(countryCount,
new Fields("Country"),
new MapGet(),
new Fields("count"))
.each(new Fields("count"),
new FilterNull());
return topology.build();
}
}
persistentAggregate() 方法会将输入的stream转换成TridentState对象. 该例子中, countryCount变量代表每个country目前的值.