9.3 核心思想

优质
小牛编辑
127浏览
2023-12-01

Kafka Streams 是一个处理和分析 Kafka 系统中的数据的客户端库。
它建立在重要的流处理概念之上,例如能够恰当地区分 event time 和 Processing time 、支持 window 操作以及简单有效、支持实时查询的应用程序状态管理。

Kafka Streams 的入门门槛很低。我们可以在单节点环境上快速实现一个小规模的验证性的程序,只要程序能在多节点的集群环境成功运行即可部署到高负载的生产环境。
Kafka Streams 通过利用 Kafka 的并行模型实现对相同应用程序的多个实例的负载平衡,这对于用户来说是透明的。

Kafka Streams 的一些亮点:

  • 被设计为简单且轻量级的客户端库,可以轻松嵌入到任何Java应用程序中,也可轻松地与用户的任何流式应用程序的打包,部署和操作工具互相集成。
  • 除了使用 Apache Kafka 作为内部消息传递层之外, Kafka Streams 没有外部依赖关系; 值得注意的是,它使用 Kafka 的 partition 模型来实现横向扩展处理,并同时保证强有力的有序性。
  • 支持容错的本地状态,它支持非常快速和高效的有状态操作,如窗口连接和聚合。
  • 支持 exactly-once 处理语义,以确保每个记录只被处理一次,即使错误发生在 Streams 客户端或 Kafka Broker 中也仅处理一次。
  • 采用one-record-at-a-time processing的方式,以实现毫秒级的处理延迟,并支持对延迟的记录使用基于 event-time 的窗口操作
  • 提供必要的流处理元操作,包括高度抽象的流 DSL 语言底层流处理器的 API

以下是 Kafka Stream 的几个重要概念。

流处理拓扑结构

  • 流(Stream)是 Kafka Stream 的一个非常重要的抽象概念,代表一个无界的、持续更新的数据集。 Stream 是一个有序、可重演、容错并且不可变的数据集,它的数据 是以 key-value 的方式定义的。
  • 流处理程序(stream processing application) 是指所有应用了 Kafka Streams library 的程序。 流处理程序通过一个以上的 处理器拓扑结构(processor topology) 定义计算逻辑,其中 处理器拓扑结构 是一个连接到流(边界)的流处理器(节点)。
  • 流处理器(stream processor) 是处理器拓扑结构的一个节点;它代表一个处理步骤:从拓扑结构中的前置流处理器接收输入数据并按逻辑转换数据,随后向拓扑结构的后续流处理器提供一个或者多个结果数据。

拓扑结构中有两种特殊的处理器:

  • Source Processor : Source Processor 是一种没有前置节点的特殊流处理器。它从一个或者多个 Kafka Topic 消费数据并产出一个输入流给到拓扑结构的后续处理节点。
  • Sink Processor : sink processor 是一种特殊的流处理器,没有处理器需要依赖于它。 它从前置流处理器接收数据并传输给指定的 Kafka Topic 。

注意:一个正常的处理器节点在处理记录的同时是可以访问其他远程系统。因此,它的处理结果既可以写入到其他远程系统,也可以回流到 Kafka 系统中。

Kafka Streams 提供两种定义流处理拓扑结构的方式: Kafka Streams DSL 提供了一些常用的、开箱即用的数据转换操作,比如: mapfilterjoinaggregations ;而底层的 Processor API 则允许开发者定义和连接自定义的处理器,并且可以与 state stores 交互。

处理器拓扑结构仅仅是对流处理代码的抽象。在程序运行时,逻辑拓扑结构会实例化并在应用程序中复制以进行并行处理。(详细信息可参考 Stream Partitions and Tasks )。

Time

流处理中很关键的一点是 时间(time) 的概念,以及它的模型设计、如何被整合到系统中。
比如有些操作(如 窗口(windowing) ) 就是基于时间边界进行定义的。

流处理中关于时间的一些常见概念:

  • Event time : 事件或者数据记录产生的时间点,即事件在“源头”发生时的原始时间点。 举个例子: 如果是汽车GPS传感器产生的地理位置变化的事件,则 Event time 就是GPS传感器捕获到位置发生变更的时间。
  • Processing time : 数据被流处理程序加工的时间,也就是数据被消费的时间。处理事件的时间会比时间产生的原始时间晚几毫秒、几个小时甚至是几天。举个例子: 假设一个分析应用程序从汽车传感器读取和处理地理位置数据,并将结果呈现给车队管理仪表板。在这种情况下,分析应用程序的 processing-time 可能比 event time 晚几毫秒或几秒(例如,基于 Apache Kafka 和 Kafka Stream 的实时管道)或者晚几个小时(例如,基于 Apache Hadoop 或 Apache Spark 的批处理管道)。
  • Ingestion time : 事件或者数据记录被 Kafka Broker 保存到 topic partition 的时间点。与 Event time 的不同之处在于 Ingestion time 的时间戳是在记录被 Kafka Broker 添加到目标 Topic 的时候产生的,而不是在记录的源头产生的。与 Processing time 的区别在于处理时间是流处理应用程序开始处理记录的时间。 例如: 如果记录没有被处理,就没有 Processing time 的概念,但是 ingestion time 是存在的。

选用 event-time 还是 ingestion-time 是通过 Kafka (不是 Kafka Streams)来配置的。从 Kafka 0.10.x 开始,时间戳是自动嵌入到 Kafka 的消息中。至于这些时间戳是 event-time 还是 ingestion-time 取决于 Kafka 的配置。这些配置在 Broker 层面 和 Topic 层面都可以进行设置。 Kafka Streams 中默认的时间戳抽取器会原样获取这些嵌入的时间戳。因此,应用程序中时间的语义取决于生效的嵌入时间戳相关的 Kafka 配置。

Kafka Streams 通过 TimestampExtractor 接口来给每条记录分配时间戳。
每条记录的时间戳描述了关于流处理与 time 相关的信息,并且被诸如 window 之类的 time-dependent 的操作所使用。
因此,这些 time 仅在新纪录到达 processor 的时候才有用。
我们将应用程序中的以数据驱动的 time 称为 stream time 以区别于程序运行时的 wall-clock time
不同的 TimestampExtractor 的具体实现将为 stream time 定义提供不同的语义。
例如,基于数据的实际内容来检索或计算时间戳,比如嵌入时间戳字段以提供 event time 语义,以及返回当前的 wall-clock time 以便为 stream time 提供 processing time 语义。
因此开发者可以基于自己的业务需要来实施不同的 time 概念。

最后,当 Kafka Streams 应用程序向 Kafka 写记录时,程序也会给这些新记录分配时间戳。时间戳的分配方式取决于上下文:

  • 当通过处理一些输入记录来生成新的输出记录时,例如,在 process() 函数调用中触发的 context.forward() ,输出记录的时间戳是直接从输入记录的时间戳中继承而来的。
  • 当新的输出记录是通过 Punctuator#punctuate() 之类的周期性函数产生的,输出记录时间戳被定义为当前流任务的内部时间(通过context.timestamp() 函数生成)。
  • 对于聚合操作,聚合结果的时间戳将是触发聚合更新的最新到达的输入记录的时间戳。

States

有些流处理程序不需要 state ,这意味着这些程序处理的消息与其他消息互相独立。
但是,能够维护状态使很多复杂的流处理应用的实现成为可能:我们可以连接输入流,分组和聚合数据。大量这种基于状态的操作由 Kafka Streams DSL 提供。

Kafka Streams 提供一种能够被流处理应用程序用来保存和查询数据状态的功能,称为 state stores
这对实现有状态的操作提供了很大的帮助。
Kafka Streams 中的每个任务都嵌入了一个或多个 state store ,可以通过api进行访问,以及存储和查询处理所需的数据。
这些 state store 可以是以持久化的 key-value 存储,也可以是保存在内存中的 hashmap ,或者是其他方便的数据结构。
Kafka Streams 为本地的 state store 提供容错和自动恢复。

Kafka Streams 允许创建 state store 的流处理应用程序的外部方法、线程、进程或应用程序对 state store 进行直接的只读查询。 这是通过称为交互式查询( Interactive Queries )的功能提供的。 所有 store 都是命名的,交互式查询只公开底层实现的读取操作。

Processing Guarantees

在流处理领域,最常被问到的问题是:“即使在处理过程中遇到了一些故障,流处理系统是否保证每个记录只处理一次?”
不能保证 "exactly-once" 处理方式对于许多不能容忍任何数据丢失或数据重复的应用程序来说是一种破坏,在这种情况下,除了流处理管道之外,通常还会使用面向批处理的框架,也就是所谓的 Lambda 架构。

在0.11.0.0之前, Kafka 仅提供 "at-least-once" 的传递保证,因此任何利用它作为后端存储的流处理系统都不能保证端到端 "exactly-once" 语义。
实际上,即使对于那些声称支持 "exactly-once" 语义的流处理系统,只要他们将 Kafka 系统作为读/写 的源/目标,他们的应用程序实际上并不能保证在整个流水线中不会产生重复。

自从0.11.0.0版本发布以来,Kafka 允许 Producer 以一种事务性的和幂等的方式向不同的 topic partition 发送消息提供强有力的支持,而 Kafka Streams 则通过利用这些特性来增加了端到端的 "exactly-once" 处理语义。
更具体地说,它保证对于从 Kafka topics 读取的任何记录的处理结果将在 Kafka topic 输出结果中反映一次,在 state stores 中也仅进行一次状态操作。
需要注意的是,Kafka Streams 的端到端一次性语义与其他流处理框架的主要区别在于,Kafka Streams 与底层的 Kafka 存储系统紧密集成,并确保输入 topics offset 的提交,state stores 的更新和写入输出 topics 的原子性,而不是将 Kafka 视为可能有副作用的外部系统。
要详细了解如何在 Kafka Streams 内完成此操作,建议读者阅读 KIP-129

在运行 Kafka 流应用程序时,为了实现 exactly-once 语义,用户需要设置 processing.guarantee 参数的值为 exactly_once (默认值为 at_least_once )。更多细节请参考 Kafka Streams Configs 部分.