概念
本页列出了Storm 的主要概念, 以及可以获取到更多信息的资源链接, 概念如下:
- Topologies(拓扑)
- Streams(流)
- Spouts
- Bolts
- Stream groupings(流分组)
- Reliability(可靠性)
- Tasks
- Workers
Topologies(拓扑)
实时应用程序的逻辑被封装在 Storm topology(拓扑)中. Storm topology(拓扑)类似于 MapReduce 作业. 两者之间关键的区别是 MapReduce 作业最终会完成, 而 topology(拓扑)任务会永远运行(除非 kill 掉它). 一个拓扑是 Spout 和 Bolt 通过 stream groupings 连接起来的有向无环图.这些概念会在下面的段落中具体描述.
相关资料:
- TopologyBuilder: Java中使用这个类来构建 topology(拓扑)
- 如何在生产集群上运行 topologies(拓扑)
- 如何使用 local 模式: 学习如何用本地模式开发和测试 topology(拓扑)
Streams(流)
stream 是 Storm 中的核心概念.一个 stream 是一个无界的、以分布式方式并行创建和处理的 Tuple 序列. stream 以一个 schema 来定义, 这个 schema 用来命名 stream tuple(元组)中的字段.默认情况下 Tuple 可以包含 integers, longs, shorts, bytes, strings, doubles, floats, booleans, and byte arrays 等数据类型.你也可以定义自己的 serializers, 以至于可以在 Tuple 中使用自定义的类型.
每一个流在声明的时候会赋予一个 ID. 由于只包含一个 stream 的 Spout 和 Bolt 比较常见, OutputFieldsDeclarer 有更方便的方法可以定义一个单一的 stream 而不用指定ID. 这个 stream 被赋予一个默认的 ID, "default".
相关资料:
- Tuple: stream 由一系列连续的 Tuple 组成
- OutputFieldsDeclarer: 用于声明 streams 和它的 schemas.
- Serialization: Storm Tuple 的动态类型,和自定义 serializations 的相关信息
Spouts
Spout 是一个 topology(拓扑)中 streams 的源头. 通常 Spout 会从外部数据源读取 Tuple,然后把他们发送到拓扑中(如 Kestel 队列, 或者 Twitter API). Spout 可以是 可靠的 或 不可靠的. 可靠的 Spout 在 Storm 处理失败的时候能够重放 Tuple, 不可靠的 Spout 一旦把一个 Tuple 发送出去就撒手不管了.
Spout 可以发送多个流. 可以使用 OutputFieldsDeclarer 的 declareStream 方法定义多个流, 在 SpoutOutputCollector 对象的 emit 方法中指定要发送到的 stream .
Spout 中的最主要的方法是 nextTuple
. nextTuple
要么向 topology(拓扑)中发送一个新的 Tuple, 要么在没有 Tuple 需要发送的情况下直接返回. 对于任何 Spout 实现, nextTuple
方法都必须非阻塞的, 因为 Storm 在一个线程中调用所有的 Spout 方法.
Spout 的另外几个重要的方法是 ack
和 fail
. 这些方法在 Storm 检测到 Spout 发送出去的 Tuple 被成功处理或者处理失败的时候调用. ack
和fail
只会在可靠的 Spout 中调用. 更多相关信息, 请参见 the Javadoc.
相关资料:
- IRichSpout: 创建 Spout 时必须实现的接口
- Guaranteeing message processing
Bolts
拓扑中所有的业务处理都在 Bolts 中完成. Bolt 可以做很多事情,过滤, 函数, 聚合, 关联, 与数据库交互等.
Bolt 可以做简单 stream 转换. 复杂的 stream 转换一般需要多个步骤,因此也就要多个 Bolt 协同工作. 如, 转换一个 tweets stream 为一个 trending images stream 需要两个步骤:一个 Bolt 做每个图片被收藏 的滚动计数,同时一个或者多个 Bolt 输出被收藏 Top X 的图片 (你可以使用更具弹性的方式处理这个 stream 转换, 用3个 Bolt 而不是先前的2个 Bolt ).
Bolt 可以发送多个 stream. 可以使用 OutputFieldsDeclarer 的 declareStream
方法定义多个 streams, 并且在使用 OutputCollector emit
方法的时候指定要发送的 stream.
当你声明一个 Bolt 的 input stream,你总是会订阅其他组件特定的 stream .如果你想要订阅其他组件所有的 streams,你必须一个个的订阅. InputDeclarer 有语法可以订阅默认 stream-id 的 stream,代码:declarer.shuffleGrouping ("1")
,意思是: 订阅组件 “1” 的默认 stream, 等价于 declarer.shuffleGrouping("1", DEFAULT_STREAM_ID)
.
Bolt 中最主要的方法是 execute
方法, 当有一个新 Tuple 输入的时候会进入这个方法. Bolt 使用OutputCollector 对象发送新的 Tuple. Bolt 必须在每一个 Tuple 处理完以后调用 OutputCollector
上的 ack
方法, Storm 就会知道 tuple 什么时候完成 (最终可以确定 调用源 Spout Tuple 是没有问题的). 当处理一个输入的 Tuple:会基于这个 Tuple 产生零个或者多个 Tuple 发送出去,当所有的tuple 完成后,会调用 acking. Storm 提供了 IBasicBolt 接口会自动执行 acking .
最好在 Bolt 中启动新的线程异步处理 tuples. OutputCollector 是线程安全的, 并且可以在任何时刻调用.
相关资料:
- IRichBolt: Bolts 的通用接口
- IBasicBolt: 一个可以使用过滤或者一些简单功能的 Bolt 的接口
- OutputCollector: Bolts 使用这个类的实例发送 Tuple 到他们的输出流
- Guaranteeing message processing
Stream groupings
topology(拓扑)定义中有一部分是为每一个 bolt 指定输入的 streams . stream grouping 定义了stream 如何在 Bolts tasks 之间分区.
Storm 中一共有8个内置的 Stream Grouping. 可聂以通过实现 CustomStreamGrouping 接口来自定义 Stream groupings.
- Shuffle grouping: Tuple 随机的分发到 Bolt Task, 每个 Bolt 获取到等量的 Tuple.
- Fields grouping: streams 通过 grouping 指定的字段来分区. 例如流通过 "user-id" 字段分区, 具有相同 "user-id" 的 Tuple 会发送到同一个task, 不同 "user-id" 的 Tuple 可能会流入到不同的 tasks.
- Partial Key grouping: stream 通过 grouping 中指定的 field 来分组, 与 Fields Grouping 相似. 但是对于 2 个下游的 Bolt 来说是负载均衡的, 可以在输入数据不平均的情况下提供更好的优化. 以下地址 This paper 更好的解释了它是如何工作的及它的优势.
- All grouping: stream 在所有的 Bolt Tasks之间复制. 这个 Grouping 小心使用.
- Global grouping: 整个 stream 会进入 Bolt 其中一个任务.特别指出, 它会进入 id 最小的 task.
- None grouping: 这个 grouping , 你不需要关心 stream 如何分组. 当前, None grouping 和 Shuffle grouping 等价. 同时, Storm 将使用 None grouping 的 bolts 和上游订阅的 bolt和spout 运行在同一个线程 (when possible).
- Direct grouping: 这是一种特殊的 grouping 方式. stream 用这个方式 group 意味着由这个 Tuple 的 生产者 来决定哪个 消费者 来接收它. Direct grouping 只能被用于 direct streams . 被发射到 direct stream 的 tuple 必须使用 emitDirect(int, int, java.util.List) 方法来发送. Bolt 可以使用 TopologyContext 或者通过保持对OutputCollector(返回 Tuple 被发送到的目标 task id) 中的
emit
方法输出的跟踪,获取到它的所有消费者的 ID . - Local or shuffle grouping: 如果目标 Bolt 有多个 task 和 streams源 在同一个 woker 进程中, Tuple 只会 shuffle 到相同 worker 的任务.否则, 就和 shuffle goruping 一样.
相关资料:
- TopologyBuilder: 使用这个类来定义一个拓扑
- InputDeclarer: 当在
TopologyBuilder
上调用setBolt
方法的时候返回这个对象, 用于声明一个 Bolt 的 input streams 以及这些 streams 如何分组.
可靠性
Storm 保障每一个 Spout 的 Tuple 都会被 topology(拓扑)处理.通过跟踪 tuples tree,每个 spout tuple 都会触发 tree , 确保 tuples tree 成功完成. 每一个拓扑都有一个关联的“message timeout”. 如果 Storm 检测到一个 Spout Tuple 没有在这个超时时间内被处理完成, 则判定这个 Tuple 失败, 稍后重新执行.
要利用这个可靠性的功能, 当在 Tuple tree 中创建一个新的 edge ,必须告诉Storm,并且在一个单独的 tuple 完成时也要通知 Storm. 以上操作在 Bolt 用于发送 Tuple 的 OutputCollector 对象中完成这个操作. Anchoring(锚点)在 emit
方法中完成, 使用 ack
方法来声明你已经成功完成了一个 Tuple 的处理.
更详细的说明, 请参阅 保证消息容错.
Tasks
每个 Spout 或者 Bolt 都以跨集群的多个 Task 方式执行. 每个 Task 对应一个 execution 的线程, stream groupings 定义如何从一个 Task 发送 Tuple 到另一个 Task. 可以在 TopologyBuilder 的setSpout
和 setBolt
方法中为每个 Spout 或者 Bolt 设置并行度,.
Workers
Topologies (拓扑)在一个或者跨多个 worker 执行. 每个 Worker 进程是一个物理的 JVM, 执行 topology(拓扑) Tasks 中的一个子集. 例如, 如果一个拓扑的并行度是 300, 共有 50 个 Worker 在运行, 每个 Worker 会分配到 6 个 Task(作为 Worker 中的线程). Storm 会尽量把所有 Task 均匀的分配到所有的 Worker 上.
相关资料:
- Config.TOPOLOGY_WORKERS: 这个配置项设置用于运行 topology(拓扑)的 worker 数量.