Trident API 概述
"Stream" 是 Trident 中的核心数据模型, 它被当做一系列的 batch 来处理.在 Storm 集群的节点之间, 一个 stream 被划分成很多 partition (分区), 对流的 operation (操作)是在每个 partition 上并行进行的.
注:
- "Stream" 是 Trident 中的核心数据模型:有些地方也说是 TridentTuple , 没有个标准的说法.
- 一个 stream 被划分成很多 partition : partition 是 stream 的一个子集, 里面可能有多个 batch , 一个 batch 也可能位于不同的 partition 上.
Trident 有 5 类操作:
- Partition-local operations , 对每个 partition 的局部操作, 不产生网络传输
- Repartitioning operations: 对 stream (数据流)的重新划分(仅仅是划分, 但不改变内容), 产生网络传输
- 作为 operation (操作)的一部分进行网络传输的 Aggregation operations (聚合操作).
- Operations on grouped streams (作用在分组流上的操作)
- Merges 和 joins 操作
Partition-local operations
Partition-local operations (分区本地操作)不涉及网络传输, 并且独立地应用于每个 batch partition (批处理分区).
Functions
一个 function 收到一个输入 tuple 后可以输出 0 或多个 tuple , 输出 tuple 的字段被追加到接收到的输入 tuple 后面.如果对某个 tuple 执行 function 后没有输出 tuple, 则该 tuple 被 filter(过滤), 否则, 就会为每个输出 tuple 复制一份输入 tuple 的副本.假设有如下的 function :
public class MyFunction extends BaseFunction {
public void execute(TridentTuple tuple, TridentCollector collector) {
for(int i=0; i < tuple.getInteger(0); i++) {
collector.emit(new Values(i));
}
}
}
假设有个叫 "mystream" 的 stream (流), 该流中有如下 tuple ( tuple 的字段为["a", "b", "c"] ):
[1, 2, 3]
[4, 1, 6]
[3, 0, 8]
如果您运行下面的代码:
mystream.each(new Fields("b"), new MyFunction(), new Fields("d")))
则 resulting tuples (输出 tuple )中的字段为 ["a", "b", "c", "d"], 如下所示:
[1, 2, 3, 0]
[1, 2, 3, 1]
[4, 1, 6, 0]
Filters
Filters 收到一个输入 tuple , 并决定是否保留该 tuple .假设nin拥有这个 filters:
public class MyFilter extends BaseFilter {
public boolean isKeep(TridentTuple tuple) {
return tuple.getInteger(0) == 1 && tuple.getInteger(1) == 2;
}
}
现在, 假设您有如下这些 tuple , 包含字段 ["a", "b", "c"]:
[1, 2, 3]
[2, 1, 1]
[2, 3, 4]
如果您运行如下代码:
mystream.filter(new MyFilter())
则得到的 resulting tuples (结果 tuples)为:
[1, 2, 3]
map and flatMap
map
返回一个 stream , 它包含将给定的 mapping function (映射函数)应用到 stream 的 tuples 的结果. 这个可以用来对 tuples 应用 one-one transformation (一一变换).
例如, 如果有一个 stream of words (单词流), 并且您想将其转换为 stream of upper case words (大写字母的流), 你可以定义一个 mapping function (映射函数)如下,
public class UpperCase extends MapFunction {
@Override
public Values execute(TridentTuple input) {
return new Values(input.getString(0).toUpperCase());
}
}
然后可以将 mapping function (映射函数)应用于 stream 以产生 stream of uppercase words (大写字的流).
mystream.map(new UpperCase())
flatMap
类似于 map
, 但具有将 one-to-many transformation (一对多变换)应用于 values of the stream (流的值)的效果, 然后将所得到的元素 flattening (平坦化)为新的 stream .
例如, 如果有 stream of sentences (句子流), 并且您想将其转换成 stream of words (单词流), 你可以定义一个 flatMap 函数如下,
public class Split extends FlatMapFunction {
@Override
public Iterable<Values> execute(TridentTuple input) {
List<Values> valuesList = new ArrayList<>();
for (String word : input.getString(0).split(" ")) {
valuesList.add(new Values(word));
}
return valuesList;
}
}
然后可以将 flatMap 函数应用于 stream of sentences (句子流)以产生一个 stream of words (单词流),
mystream.flatMap(new Split())
当然这些操作可以被 chained (链接), 因此可以从如下的 stream of sentences (句子流)中获得 stream of uppercase words (大写字的流),
mystream.flatMap(new Split()).map(new UpperCase())
如果不将 output fields (输出字段)作为 parameter (参数)传递, 则 map 和 flatMap 会将 input fields (输入字段)保留为 output fields (输出字段).
如果要使用 MapFunction 或 FlatMapFunction 使用 new output fields (新的输出字段)替换 old fields (旧字段), 您可以使用附加的 Fields 参数调用 map/flatMap , 如下所示,
mystream.map(new UpperCase(), new Fields("uppercased"))
Output stream (输出流)只有一个 output field (输出字段) "uppercased" , 而不管以前的流有什么输出字段. 同样的事情适用于 flatMap, 所以以下是有效的,
mystream.flatMap(new Split(), new Fields("word"))
peek
peek
可用于在每个 trident tuple 流过 stream 时对其执行 additional action (附加操作). 这可能对于在流经 pipeline 中 certain point (某一点)的元组来 debugging (调试) tuples 是有用的.
例如, 下面的代码将打印在将这些单词转换为 groupBy
之前将单词转换为大写的结果
mystream.flatMap(new Split()).map(new UpperCase())
.peek(new Consumer() {
@Override
public void accept(TridentTuple input) {
System.out.println(input.getString(0));
}
})
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
min and minBy
min
和 minBy
operations (操作)在 trident stream 中的 a batch of tuples (一批元组)的每个 partition (分区)上返回 minimum value (最小值).
假设 trident stream 包含字段 ["device-id", "count"] 和 partitions of tuples (元组的以下分区)
Partition 0:
[123, 2]
[113, 54]
[23, 28]
[237, 37]
[12, 23]
[62, 17]
[98, 42]
Partition 1:
[64, 18]
[72, 54]
[2, 28]
[742, 71]
[98, 45]
[62, 12]
[19, 174]
Partition 2:
[27, 94]
[82, 23]
[9, 86]
[53, 71]
[74, 37]
[51, 49]
[37, 98]
minBy
operation (操作)可以应用在上面的 stream of tuples (元组流)中, 如下所示, 这导致在每个 partition (分区)中以最小值 count
field (字段)发出 tuples .
mystream.minBy(new Fields("count"))
上述代码在上述 partitions (分区)上的结果是:
Partition 0:
[123, 2]
Partition 1:
[62, 12]
Partition 2:
[82, 23]
您可以在 Stream 上查看其他 min
和 minBy
操作
public <T> Stream minBy(String inputFieldName, Comparator<T> comparator)
public Stream min(Comparator<TridentTuple> comparator)
下面的示例显示了如何使用这些 API 来使用 tuple 上的 respective Comparators (相应比较器)来找到 minimum (最小值).
FixedBatchSpout spout = new FixedBatchSpout(allFields, 10, Vehicle.generateVehicles(20));
TridentTopology topology = new TridentTopology();
Stream vehiclesStream = topology.newStream("spout1", spout).
each(allFields, new Debug("##### vehicles"));
Stream slowVehiclesStream =
vehiclesStream
.min(new SpeedComparator()) // Comparator w.r.t speed on received tuple.
.each(vehicleField, new Debug("#### slowest vehicle"));
vehiclesStream
.minBy(Vehicle.FIELD_NAME, new EfficiencyComparator()) // Comparator w.r.t efficiency on received tuple.
.each(vehicleField, new Debug("#### least efficient vehicle"));
这些 API 的示例应用程序可以位于 TridentMinMaxOfDevicesTopology 和 TridentMinMaxOfVehiclesTopology .
max and maxBy
max
和 maxBy
operations (操作)在 trident stream 中的一 batch of tuples (批元组)的每个 partition (分区)上返回 maximum (最大值).
假设 trident stream 包含上述部分所述的字段 ["device-id", "count"] .
max
和 maxBy
operations (操作)可以应用于上面的 stream of tuples (元组流), 如下所示, 这导致每个分区的最大值为 count
字段的元组.
mystream.maxBy(new Fields("count"))
上述代码在上述 partitions (分区)上的结果是:
Partition 0:
[113, 54]
Partition 1:
[19, 174]
Partition 2:
[37, 98]
您可以在 Stream 上查看其他 max
和 maxBy
函数
public <T> Stream maxBy(String inputFieldName, Comparator<T> comparator)
public Stream max(Comparator<TridentTuple> comparator)
下面的示例显示了如何使用这些 API 来使用元组上的 respective Comparators (相应比较器)来找到 maximum (最大值).
FixedBatchSpout spout = new FixedBatchSpout(allFields, 10, Vehicle.generateVehicles(20));
TridentTopology topology = new TridentTopology();
Stream vehiclesStream = topology.newStream("spout1", spout).
each(allFields, new Debug("##### vehicles"));
vehiclesStream
.max(new SpeedComparator()) // Comparator w.r.t speed on received tuple.
.each(vehicleField, new Debug("#### fastest vehicle"))
.project(driverField)
.each(driverField, new Debug("##### fastest driver"));
vehiclesStream
.maxBy(Vehicle.FIELD_NAME, new EfficiencyComparator()) // Comparator w.r.t efficiency on received tuple.
.each(vehicleField, new Debug("#### most efficient vehicle"));
这些 API 的示例应用程序可以位于 TridentMinMaxOfDevicesTopology 和 TridentMinMaxOfVehiclesTopology
Windowing
Trident streams 可以 batches (批处理)同一个 windowing (窗口)的元组, 并将 aggregated result (聚合结果)发送到下一个 operation (操作). 有 2 种支持的 windowing (窗口)是基于 processing time (处理时间)或 tuples count (元组数): 1. Tumbling window 2. Sliding window
Tumbling window
基于 processing time (处理时间)或 count (计数), 元组在 single window (单个窗口)中分组. 任何 tuple (元组)只属于其中一个 windows (窗口).
/**
* Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples.
*/
public Stream tumblingWindow(int windowCount, WindowsStoreFactory windowStoreFactory,
Fields inputFields, Aggregator aggregator, Fields functionFields);
/**
* Returns a stream of tuples which are aggregated results of a window that tumbles at duration of {@code windowDuration}
*/
public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, WindowsStoreFactory windowStoreFactory,
Fields inputFields, Aggregator aggregator, Fields functionFields);
Sliding window
每个 sliding interval (滑动间隔), Tuples (元组)被分组在 windows (窗口)和 window slides 中.元组可以属于多个 window (窗口).
/**
* Returns a stream of tuples which are aggregated results of a sliding window with every {@code windowCount} of tuples
* and slides the window after {@code slideCount}.
*/
public Stream slidingWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory,
Fields inputFields, Aggregator aggregator, Fields functionFields);
/**
* Returns a stream of tuples which are aggregated results of a window which slides at duration of {@code slidingInterval}
* and completes a window at {@code windowDuration}
*/
public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingInterval,
WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);
tumbling 和 sliding windows 的示例可以在 这里 被找到.
通用 windowing API
以下是通用的 windowing API, 它为任何支持的 windowing 配置提供了 WindowConfig
.
public Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields,
Aggregator aggregator, Fields functionFields)
windowConfig
可以是下面的任何一个.
SlidingCountWindow.of(int windowCount, int slidingCount)
SlidingDurationWindow.of(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingDuration)
TumblingCountWindow.of(int windowLength)
TumblingDurationWindow.of(BaseWindowedBolt.Duration windowLength)
Trident windowing APIs 需要 WindowsStoreFactory
来存储接收的 tuples 和 aggregated values (聚合值). 目前, HBase 的基本实现由 HBaseWindowsStoreFactory
提供. 可以进一步扩展以解决各自的用途. 使用 HBaseWindowStoreFactory
进行 windowing 的例子可以在下面看到.
// window-state table should already be created with cf:tuples column
HBaseWindowsStoreFactory windowStoreFactory = new HBaseWindowsStoreFactory(new HashMap<String, Object>(), "window-state", "cf".getBytes("UTF-8"), "tuples".getBytes("UTF-8"));
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
new Values("how many apples can you eat"), new Values("to be or not to be the person"));
spout.setCycle(true);
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),
new Split(), new Fields("word"))
.window(TumblingCountWindow.of(1000), windowStoreFactory, new Fields("word"), new CountAsAggregator(), new Fields("count"))
.peek(new Consumer() {
@Override
public void accept(TridentTuple input) {
LOG.info("Received tuple: [{}]", input);
}
});
StormTopology stormTopology = topology.build();
可以在 这里 中找到本节中所有上述 API 的详细说明.
示例应用程序
这些 API 的示例应用程序位于 TridentHBaseWindowingStoreTopology 和 TridentWindowingInmemoryStoreTopology
partitionAggregate
partitionAggregate 在每个 batch of tuples (批量元组) partition 上执行一个 function 操作(实际上是聚合操作), 但它又不同于上面的 functions 操作, partitionAggregate 的输出 tuple 将会取代收到的输入 tuple , 如下面的例子:
mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
假设 input stream 包括字段 ["a", "b"] , 并有下面的 partitions of tuples (元组 partitions ):
Partition 0:
["a", 1]
["b", 2]
Partition 1:
["a", 3]
["c", 8]
Partition 2:
["e", 1]
["d", 9]
["d", 10]
则这段代码的 output stream 包含如下 tuple , 且只有一个 "sum" 的字段:
Partition 0:
[3]
Partition 1:
[11]
Partition 2:
[20]
上面代码中的 new Sum() 实际上是一个 aggregator (聚合器), 定义一个聚合器有三种不同的接口:CombinerAggregator, ReducerAggregator 和 Aggregator .
下面是 CombinerAggregator 接口:
public interface CombinerAggregator<T> extends Serializable {
T init(TridentTuple tuple);
T combine(T val1, T val2);
T zero();
}
一个 CombinerAggregator 仅输出一个 tuple (该 tuple 也只有一个字段).每收到一个输入 tuple, CombinerAggregator 就会执行 init() 方法(该方法返回一个初始值), 并且用 combine() 方法汇总这些值, 直到剩下一个值为止(聚合值).如果 partition 中没有 tuple, CombinerAggregator 会发送 zero() 的返回值.下面是聚合器 Count 的实现:
public class Count implements CombinerAggregator<Long> {
public Long init(TridentTuple tuple) {
return 1L;
}
public Long combine(Long val1, Long val2) {
return val1 + val2;
}
public Long zero() {
return 0L;
}
}
当使用 aggregate() 方法代替 partitionAggregate() 方法时, 就能看到 CombinerAggregation 带来的好处.这种情况下, Trident 会自动优化计算:先做局部聚合操作, 然后再通过网络传输 tuple 进行全局聚合.
ReducerAggregator 接口如下:
public interface ReducerAggregator<T> extends Serializable {
T init();
T reduce(T curr, TridentTuple tuple);
}
ReducerAggregator 使用 init() 方法产生一个初始值, 对于每个输入 tuple , 依次迭代这个初始值, 最终产生一个单值输出 tuple .下面示例了如何将 Count 定义为 ReducerAggregator:
public class Count implements ReducerAggregator<Long> {
public Long init() {
return 0L;
}
public Long reduce(Long curr, TridentTuple tuple) {
return curr + 1;
}
}
ReducerAggregator 也可以与 persistentAggregate 一起使用, 稍后你会看到的.
用于 performing aggregations (执行聚合)的最通用的接口是 Aggregator , 如下所示:
public interface Aggregator<T> extends Operation {
T init(Object batchId, TridentCollector collector);
void aggregate(T state, TridentTuple tuple, TridentCollector collector);
void complete(T state, TridentCollector collector);
}
Aggregator 可以输出任意数量的 tuple , 且这些 tuple 的字段也可以有多个.执行过程中的任何时候都可以输出 tuple (三个方法的参数中都有 collector ). Aggregator 的执行方式如下:
- 处理每个 batch 之前调用一次 init() 方法, 该方法的返回值是一个对象, 代表 aggregation 的状态, 并且会传递给下面的 aggregate() 和 complete() 方法.
- 每个收到一个该 batch 中的输入 tuple 就会调用一次 aggregate , 该方法中可以 update the state (更新状态)(第一点中 init() 方法的返回值)并 optionally emit tuples (可选地发出元组).
- 当该 batch partition 中的所有 tuple 都被 aggregate() 方法处理完之后调用 complete 方法.
注:理解 batch, partition 之间的区别将会更好的理解上面的几个方法.
下面的代码将 Count 作为 Aggregator 实现:
public class CountAgg extends BaseAggregator<CountState> {
static class CountState {
long count = 0;
}
public CountState init(Object batchId, TridentCollector collector) {
return new CountState();
}
public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
state.count+=1;
}
public void complete(CountState state, TridentCollector collector) {
collector.emit(new Values(state.count));
}
}
有时需要同时执行 multiple aggregators (多个聚合)操作, 这个可以使用 chaining (链式)操作完成:
mystream.chainedAgg()
.partitionAggregate(new Count(), new Fields("count"))
.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
.chainEnd()
这段代码将会对每个 partition 执行 Count 和 Sum aggregators (聚合器), 并输出一个tuple 字段 ["count", "sum"].
stateQuery and partitionPersist
stateQuery 和 partitionPersist 分别 query (查询)和 update (更新) sources of state (状态源). 您可以在 Trident state doc 上阅读有关如何使用它们.
projection
经 Stream 中的 project 方法处理后的 tuple 仅保持指定字段(相当于过滤字段).如果你有一个包含字段 ["a", "b", "c", "d"] 的 stream , 执行下面代码:
mystream.project(new Fields("b", "d"))
则 output stream 将仅包含 ["b", "d"] 字段.
Repartitioning operations
Repartitioning operations (重新分区操作)运行一个函数来 change how the tuples are partitioned across tasks (更改元组在任务之间的分区). number of partitions (分区的数量)也可以由于 repartitioning (重新分区)而改变(例如, 如果并行提示在 repartioning (重新分配)后更大). Repartitioning (重新分区)需要 network transfer (网络传输). 以下是 repartitioning functions (重新分区功能):
- shuffle: 随机将 tuple 均匀地分发到目标 partition 里.
- broadcast: 每个 tuple 被复制到所有的目标 partition 里, 在 DRPC 中有用 — 你可以在每个 partition 上使用 stateQuery .
- partitionBy: 对每个 tuple 选择 partition 的方法是:(该 tuple 指定字段的 hash 值) mod (目标 partition 的个数), 该方法确保指定字段相同的 tuple 能够被发送到同一个 partition .(但同一个 partition 里可能有字段不同的 tuple ).
- global: 所有的 tuple 都被发送到同一个 partition .
- batchGlobal: 确保同一个 batch 中的 tuple 被发送到相同的 partition 中.
- partition: 此方法采用实现 org.apache.storm.grouping.CustomStreamGrouping 的自定义分区函数.
Aggregation operations
Trident 中有 aggregate() 和 persistentAggregate() 方法对流进行聚合操作. aggregate() 在每个 batch 上独立的执行, persistemAggregate() 对所有 batch 中的所有 tuple 进行聚合, 并将结果存入 state 源中.
aggregate() 对 Stream 做全局聚合, 当使用 ReduceAggregator 或者 Aggregator 聚合器时, 流先被重新划分成一个大分区(仅有一个 partition ), 然后对这个 partition 做聚合操作;另外, 当使用 CombinerAggregator 时, Trident 首先对每个 partition 局部聚合, 然后将所有这些 partition 重新划分到一个 partition 中, 完成全局聚合.相比而言, CombinerAggregator 更高效, 推荐使用.
下面的例子使用 aggregate() 对一个 batch 操作得到一个全局的 count:
mystream.aggregate(new Count(), new Fields("count"))
同在 partitionAggregate 中一样, aggregate 中的聚合器也可以使用链式用法.但是, 如果你将一个 CombinerAggregator 链到一个非 CombinerAggregator 后面, Trident 就不能做局部聚合优化.
关于 persistentAggregate 的用法请参见 Trident state doc 一文.
Operations on grouped streams
groupBy 操作先对流中的指定字段做 partitionBy 操作, 让指定字段相同的 tuple 能被发送到同一个 partition 里.然后在每个 partition 里根据指定字段值对该分区里的 tuple 进行分组.下面演示了 groupBy 操作的过程:
如果你在一个 grouped stream 上做聚合操作, 聚合操作将会在每个 group (分组)内进行, 而不是整个 batch 上. GroupStream 类中也有 persistentAggregate 方法, 该方法聚合的结果将会存储在一个 key 值为分组字段(即 groupBy 中指定的字段)的 MapState 中, 这些还是在 Trident state doc 一文中讲解.
和普通的 stream 一样, groupstream 上的聚合操作也可以使用 chained (链式语法).
Merges and joins
最后一部分 API 内容是关于将几个 stream 汇总到一起, 最简单的汇总方法是将他们合并成一个 stream , 这个可以通过 TridentTopology 中的 merge 方法完成, 就像这样:
topology.merge(stream1, stream2, stream3);
Trident 将把新的 merged stream 的 output fields 命名为第一个 stream 的 output fields (输出字段).
另一种 combine streams (汇总方法)是使用 join (连接, 类似于 sql 中的连接操作, 需要有限的输入).所以, 它们对于 infinite streams (无限流)是没有意义的. Joins in Trident 仅适用于从 spout 发出的每个 small batch 中.
以下是包含字段 ["key", "val1", "val2"] 的 stream 和包含 ["x", "val1"] 的另一个 stream 之间的 join 示例:
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));
使用 "key" 和 "x" 作为每个相应流的连接字段将 stream1 和 stream2 join 在一起.然后, Trident 要求命名 new stream 的所有 output fields , 因为 input streams 可能具有 overlapping field names (重叠的字段名称).从 join 发出的 tuples 将包含:
- list of join fields (连接字段列表).在这种情况下, "key" 对应于 stream1 的 "key" , stream2 对应于 "x" .
- 接下来, 按照 streams 如何传递到 join 方法的顺序, 所有流中的所有 non-join fields (非连接字段)的列表.在这种情况下, "a" 和 "b" 对应于来自 stream1 的 "val1" 和 "val2" , "c" 对应于来自 stream2 的 "val1" .
当来自不同 spouts 的 stream 之间发生 join 时, 这些 spouts 将与它们如何 emit batches (发出批次)同步.也就是说, 一批处理将包括 tuples from each spout (每个 spout 的元组).
你可能会想知道 - 你如何做一些像 "windowed join" 这样的事情, 其中从 join 的一边的 tuples 连接 join 另一边的最后一个小时的 tuples .
为此, 您将使用 partitionPersist 和 stateQuery .join 一端的元组的最后一小时将被存储并在 source of state (状态源)中旋转, 并由 join 字段键入.然后 stateQuery 将通过连接字段进行查找以执行 "join".