Structured Streaming

严峰
2023-12-01

Structured Streaming

Structured Streaming 是 Spark Streaming 的进化版, 如果了解了 Spark 的各⽅⾯的进化过程, 有助于理

解 Structured Streaming 的使命和作⽤

\1. Spark 的 API 进化过程

\2. Spark 的序列化进化过程

\3. Spark Streaming 和 Structured Streaming

Spark 编程模型的进化过程

⽬标

Spark 的进化过程中, ⼀个⾮常重要的组成部分就是编程模型的进化, 通过编程模型可以看得出来内在的问题和

解决⽅案

过程

\1. 编程模型 RDD 的优点和缺陷

\2. 编程模型 DataFrame 的优点和缺陷

\3. 编程模型 Dataset 的优点和缺陷

分析编程模型
RDDrdd.flatMap(.split(" "))<br /> .map((, 1)) <br /> .reduceByKey(_ + _) <br /> .collect <br /> 针对⾃定义数据对象进⾏处理, 可以处理任意类型的对象, ⽐较符合⾯向对象 <br /> RDD ⽆法感知到数据的结构, ⽆法针对数据结构进⾏编程
DataFramespark.read<br />.csv("...").<br />where( "age"=!="")<br />groupBy("age")<br />.show()<br />1. DataFrame 保留有数据的元信息, API 针对数据的结构进⾏处理, 例如说可以根据数据的某⼀列进⾏排序或者分组<br />2. DataFrame 在执⾏的时候会经过 Catalyst 进⾏优化, 并且序列化更加⾼效, 性能会更好<br />3. DataFrame 只能处理结构化的数据, ⽆法处理⾮结构化的数据, 因为 DataFrame 的内部使⽤ Row 对象保存数据<br />4. Spark 为 DataFrame 设计了新的数据读写框架, 更加强⼤, ⽀持的数据源众多
DataSetspark.read<br />.csv("...")<br />.as[Person]<br />.where(.age!= "")<br />.groupByKey(.age)<br />.count()<br />.show()<br />1. Dataset 结合了 RDD 和 DataFrame 的特点,从 API 上即可以处理结构化数据, 也可以处理⾮结构化数据<br />2. Dataset 和 DataFrame 其实是⼀个东⻄, 所以 DataFrame 的性能优势, 在 Dataset 上也有

总结

RDD 的优点

\1. ⾯向对象的操作⽅式

\2. 可以处理任何类型的数据

RDD 的缺点

\1. 运⾏速度⽐较慢, 执⾏过程没有优化

\2. API ⽐较僵硬, 对结构化数据的访问和操作没有优化

DataFrame 的优点

\1. 针对结构化数据⾼度优化, 可以通过列名访问和转换数据

\2. 增加 Catalyst 优化器, 执⾏过程是优化的, 避免了因为开发者的原因影响效率

DataFrame 的缺点

\1. 只能操作结构化数据

\2. 只有⽆类型的 API , 也就是只能针对列和 SQL 操作数据, API 依然僵硬

Dataset 的优点

\1. 结合了 RDD 和 DataFrame 的 API , 既可以操作结构化数据, 也可以操作⾮结构化数据

\1. 既有有类型的 API 也有⽆类型的 API , 灵活选择

Spark 的 序列化 的进化过程

⽬标和问题

⽬标

Spark 中的序列化过程决定了数据如何存储, 是性能优化⼀个⾮常重要的着眼点, Spark 的进化并不只是针对

编程模型提供的 API , 在⼤数据处理中, 也必须要考虑性能

问题

\1. 序列化和反序列化是什么 ?

\2. Spark 中什么地⽅⽤到序列化和反序列化 ?

\3. RDD 的序列化和反序列化如何实现 ?

\4. Dataset 的序列化和反序列化如何实现 ?

什么是序列化和序列化?

序列化是什么

\1. 序列化的作⽤就是可以将对象的内容变成⼆进制, 存⼊⽂件中保存

\2. 反序列化指的是将保存下来的⼆进制对象数据恢复成对象

序列化对对象的要求

\1. 对象必须实现 Serializable 接⼝

\2. 对象中的所有属性必须都要可以被序列化, 如果出现⽆法被序列化的属性, 则序列化失败

限制

\1. 对象被序列化后, ⽣成的⼆进制⽂件中, 包含了很多环境信息, 如对象头, 对象中的属性字段等, 所以内容相

对较⼤

\2. 因为数据量⼤, 所以序列化和反序列化的过程⽐较慢

序列化的应⽤场景

\1. 持久化对象数据

\1. ⽹络中不能传输 Java 对象, 只能将其序列化后传输⼆进制数据

在 Spark 中的序列化和反序列化的应⽤场景

Task 分发

Task 是⼀个对象, 想在⽹络中传输对象就必须要先序列化

RDD 缓存

val rdd1 = rdd.flatMap(_.split(" "))
 .map((_, 1))
 .reduceByKey(_ + _)
rdd1.cache
rdd1.collect

RDD 中处理的是对象, 例如说字符串, Person 对象等

如果缓存 RDD 中的数据, 就需要缓存这些对象

对象是不能存在⽂件中的, 必须要将对象序列化后, 将⼆进制数据存⼊⽂件

⼴播变量

⼴播变量会分发到不同的机器上, 这个过程中需要使⽤⽹络, 对象在⽹络中传输就必须先被序列化

Shuffle 过程

Shuffle 过程是由 Reducer 从 Mapper 中拉取数据, 这⾥⾯涉及到两个需要序列化对象的原因

RDD 中的数据对象需要在 Mapper 端落盘缓存, 等待拉取

Mapper 和 Reducer 要传输数据对象

Spark Streaming 的 Receiver

Spark Streaming 中获取数据的组件叫做 Receiver , 获取到的数据也是对象形式, 在获取到以后需要落盘

暂存, 就需要对数据对象进⾏序列化

算⼦引⽤外部对象

class userserializable(i: Int)
rdd.map(i => new Unserializable(i))
 .collect
 .foreach(println)

在 Map 算⼦的函数中, 传⼊了⼀个 Unserializable 的对象

Map 算⼦的函数是会在整个集群中运⾏的, 那 Unserializable 对象就需要跟随 Map 算⼦的函数被传

输到不同的节点上

如果 Unserializable 不能被序列化, 则会报错

RDD 的序列化

RDD 的序列化

RDD 的序列化只能使⽤ Java 序列化器, 或者 Kryo 序列化器

为什么?

RDD 中存放的是数据对象, 要保留所有的数据就必须要对对象的元信息进⾏保存, 例如对象头之类的 保存⼀
整个对象, 内存占⽤和效率会⽐较低⼀些

Kryo 是什么

Kryo 是 Spark 引⼊的⼀个外部的序列化⼯具, 可以增快 RDD 的运⾏速度 因为 Kryo 序列化后的对象

更⼩, 序列化和反序列化的速度⾮常快 在 RDD 中使⽤ Kryo 的过程如下

val conf = new SparkConf()
 .setMaster("local[2]")
 .setAppName("KyroTest")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[Person]))
val sc = new SparkContext(conf)
rdd.map(arr => Person(arr(0), arr(1), arr(2)))

DataFrame 和 Dataset 中的序列化

历史的问题

RDD 中⽆法感知数据的组成, ⽆法感知数据结构, 只能以对象的形式处理数据

DataFrame 和 Dataset 的特点

DataFrame 和 Dataset 是为结构化数据优化的

在 DataFrame 和 Dataset 中, 数据和数据的 Schema 是分开存储的

spark.read
 .csv("...")
 .where($"name" =!= "")
 .groupBy($"name")
 .map(row: Row => row)
 .show()

DataFrame 中没有数据对象这个概念, 所有的数据都以⾏的形式存在于 Row 对象中, Row 中记录了每

⾏数据的结构, 包括列名, 类型等

Dataset 中上层可以提供有类型的 API , ⽤以操作数据, 但是在内部, ⽆论是什么类型的数据对象 Dataset 都使

⽤⼀个叫做 InternalRow 的类型的对象存储数据

val dataset: Dataset[Person] = spark.read.csv(...).as[Person]

总结

\1. 当需要将对象缓存下来的时候, 或者在⽹络中传输的时候, 要把对象转成⼆进制, 在使⽤的时候再将⼆进制转为对象, 这个过程叫做序列化和反序列化

\2. 在 Spark 中有很多场景需要存储对象, 或者在⽹络中传输对象

\1. Task 分发的时候, 需要将任务序列化, 分发到不同的 Executor 中执⾏

\2. 缓存 RDD 的时候, 需要保存 RDD 中的数据

\3. ⼴播变量的时候, 需要将变量序列化, 在集群中⼴播

\4. RDD 的 Shuffle 过程中 Map 和 Reducer 之间需要交换数据

\5. 算⼦中如果引⼊了外部的变量, 这个外部的变量也需要被序列化

\3. RDD 因为不保留数据的元信息, 所以必须要序列化整个对象, 常⻅的⽅式是 Java 的序列化器, 和 Kyro 序列化器

\4. Dataset 和 DataFrame 中保留数据的元信息, 所以可以不再使⽤ Java 的序列化器和 Kyro 序列化器, 使⽤Spark 特有的序列化协议, ⽣成 Unsafe InternalRow ⽤以保存数据, 这样不仅能减少数据量, 也能减少序列化和反序列化的开销, 其速度⼤概能达到 RDD 的序列化的 20 倍左右

Spark Streaming 和 Structured Streaming

理解 Spark Streaming 和 Structured Streaming 之间的区别, 是⾮常必要的, 从这点上可以理解Structured Streaming 的过去和产⽣契机

\1. Spark Streaming 时代

\2. Structured Streaming 时代

\3. Spark Streaming 和 Structured Streaming

Spark Streaming 时代

Spark Streaming 其实就是 RDD 的 API 的流式⼯具, 其本质还是 RDD, 存储和执⾏过程依然类似 RDD

Structured Streaming 时代

Structured Streaming 其实就是 Dataset 的 API 的流式⼯具, API 和 Dataset 保持⾼度⼀致

Spark Streaming 和 Structured Streaming

Structured Streaming 相⽐于 Spark Streaming 的进步就类似于 Dataset 相⽐于 RDD 的进步
 另外还有⼀点, Structured Streaming 已经⽀持了连续流模型, 也就是类似于 Flink 那样的实时流, ⽽不是⼩批量, 但在使⽤的时候仍然有限制, ⼤部分情况还是应该采⽤⼩批量模式
 在 2.2.0 以后 Structured Streaming 被标注为稳定版本, 意味着以后的 Spark 流式开发不应该在采⽤Spark Streaming 了

Structured Streaming ⼊⻔

了解 Structured Streaming 的编程模型, 理解 Structured Streaming 是什么,

步骤

\1. 需求梳理

\2. Structured Streaming 代码实现

\3. 运⾏

\4. 验证结果

需求梳理

1、编写⼀个流式计算的应⽤, 不断的接收外部系统的消息

2、对消息中的单词进⾏词频统计

3、统计全局的结果

整体结构

\1. Socket Server 等待 Structured Streaming 程序连接

\2. Structured Streaming 程序启动, 连接 Socket Server , 等待 Socket Server 发送数据

\3. Socket Server 发送数据, Structured Streaming 程序接收数据

\4. Structured Streaming 程序接收到数据后处理数据

\5. 数据处理后, ⽣成对应的结果集, 在控制台打印

开发步骤及实施

Socket server 使⽤ Netcat nc 来实现

Structured Streaming 程序使⽤ IDEA 实现, 在 IDEA 中本地运⾏

\1. 编写代码

\2. 启动 nc 发送 Socket 消息

\3. 运⾏代码接收 Socket 消息统计词频

代码实现

object SocketProcessor {
 def main(args: Array[String]): Unit = {
 // 1. 创建 SparkSession
 val spark = SparkSession.builder()
 .master("local[6]")
 .appName("socket_processor")
 .getOrCreate()
 // 调整 Log 级别, 避免过多的 Log 影响视线
 spark.sparkContext.setLogLevel("ERROR") 
 import spark.implicits._
 // 2. 读取外部数据源, 并转为 Dataset[String]
 val source = spark.readStream
 .format("socket")
 .option("host", "127.0.0.1")
 .option("port", 9999)
 .load()
 .as[String]//默认 readStream 会返回 DataFrame, 但是词频统计更适合使⽤ Dataset 的有类型API 
 // 3. 统计词频
 val words = source.flatMap(_.split(" "))
 .map((_, 1))
 .groupByKey(_._1)
 .count()
 // 4. 输出结果
 words.writeStream
 .outputMode(OutputMode.Complete()) //统计全局结果, ⽽不是⼀个批次 
 .format("console") // 将结果输出到控制台 
 .start() // 开始运⾏流式应⽤ 
 .awaitTermination() // 阻塞主线程, 在⼦线程中不断获取数据 
 }
}

错误提示:

ERROR StreamMetadata: Error writing stream metadata StreamMetadata
需要配置本机的Hadoop环境变量,内部缺少dll⽂件。

总结

1、Structured Streaming 中的编程步骤依然是先读, 后处理, 最后落地
2、Structured Streaming 中的编程模型依然是 DataFrame 和 Dataset
3、Structured Streaming 中依然是有外部数据源读写框架的, 叫做 readStream 和 writeStream
4、Structured Streaming 和 SparkSQL ⼏乎没有区别, 唯⼀的区别是, readStream 读出来的是流,writeStream 是将流输出, ⽽ SparkSQL 中的批处理使⽤ read 和 write

结果输出

1、在虚拟机 leetom01 中运⾏ nc -lk 9999

2、IDEA结果输出如下

-------------------------------------------
Batch: 4
-------------------------------------------
+------+--------+
| value|count(1)|
+------+--------+
| hello| 5|
| spark| 3|
| world| 1|
|hadoop| 1|
+------+--------+

Stuctured Streaming 的体系和结构

了解 Structured Streaming 的体系结构和核⼼原理, 有两点好处, ⼀是需要了解原理才好进⾏性能调优, ⼆是了解原理后, 才能理解代码执⾏流程, 从⽽更好的记忆, 也做到知其然更知其所以然

步骤

\1. WordCount 的执⾏原理

\2. Structured Streaming 的体系结构

⽆限扩展的表格

问题

\1. 了解 Dataset 这个计算模型和流式计算的关系

\2. 如何使⽤ Dataset 处理流式数据?

\3. WordCount 案例的执⾏过程和原理

Dataset 和流式计算

可以理解为 Spark 中的 Dataset 有两种, ⼀种是处理静态批量数据的 Dataset , ⼀种是处理动态实时流的

Dataset , 这两种 Dataset 之间的区别如下

流式的 Dataset 使⽤ readStream 读取外部数据源创建, 使⽤ writeStream 写⼊外部存储

批式的 Dataset 使⽤ read 读取外部数据源创建, 使⽤ write 写⼊外部存储

如何使⽤ Dataset 这个编程模型表示流式计算?

1、可以把流式的数据想象成⼀个不断增⻓, ⽆限⽆界的表
2、⽆论是否有界, 全都使⽤ Dataset 这⼀套 API
3、通过这样的做法, 就能完全保证流和批的处理使⽤完全相同的代码, 减少这两种处理⽅式的差异

WordCount 的原理

整个计算过程⼤致上分为如下三个部分
 1、Source, 读取数据源
 2、Query, 在流式数据上的查询
 3、Result, 结果集⽣成
整个的过程如下
 1、随着时间段的流动, 对外部数据进⾏批次的划分
 2、在逻辑上, 将缓存所有的数据, ⽣成⼀张⽆限扩展的表, 在这张表上进⾏查询
 3、根据要⽣成的结果类型, 来选择是否⽣成基于整个数据集的结果

总结

Dataset 不仅可以表达流式数据的处理, 也可以表达批量数据的处理

Dataset 之所以可以表达流式数据的处理, 因为 Dataset 可以模拟⼀张⽆限扩展的表, 外部的数据会不断的 流⼊到其中

体系结构

在 Structured Streaming 中负责整体流程和执⾏的驱动引擎叫做 StreamExecution

StreamExecution 如何⼯作?

StreamExecution 分为三个重要的部分

Source , 从外部数据源读取数据

LogicalPlan , 逻辑计划, 在流上的查询计划

Sink , 对接外部系统, 写⼊结果

总结

StreamExecution 是整个 Structured Streaming 的核⼼, 负责在流上的查询

StreamExecution 中三个重要的组成部分, 分别是 Source 负责读取每个批量的数据, Sink 负责将结果写

⼊外部数据源, LogicalPlan 负责针对每个⼩批量⽣成执⾏计划

StreamExecution 中使⽤ StateStore 来进⾏状态的维护

Streaming -> Source

流式计算⼀般就是通过数据源读取数据, 经过⼀系列处理再落地到某个地⽅, 先了解⼀下如何读取数据, 可以整合哪些数据源

从 HDFS 中读取数据

案例分析

以上两种场景有两个共同的特点

会产⽣⼤量⼩⽂件在 HDFS 上

数据需要处理

实现步骤

\1. 案例结构

\2. 产⽣⼩⽂件并推送到 HDFS

\3. 流式计算统计 HDFS 上的⼩⽂件

\4. 运⾏和总结

案例流程

\1. 编写 Python ⼩程序, 在某个⽬录⽣成⼤量⼩⽂件

Python 是解释型语⾔, 其程序可以直接使⽤命令运⾏⽆需编译, 所以适合编写快速使⽤的程序, 很多时候也使⽤ Python 代替 Shell

使⽤ Python 程序创建新的⽂件, 并且固定的⽣成⼀段 JSON ⽂本写⼊⽂件

在真实的环境中, 数据也是⼀样的不断产⽣并且被放⼊ HDFS 中, 但是在真实场景下, 可能是 Flume 把⼩

⽂件不断上传到 HDFS 中, 也可能是 Sqoop 增量更新不断在某个⽬录中上传⼩⽂件

\2. 使⽤ Structured Streaming 汇总数据

HDFS 中的数据是不断的产⽣的, 所以也是流式的数据

数据集是 JSON 格式, 要有解析 JSON 的能⼒

因为数据是重复的, 要对全局的流数据进⾏汇总和去重, 其实真实场景下的数据清洗⼤部分情况下也是要去重的

\3. 使⽤控制台展示数据

最终的数据结果以表的形式呈现

使⽤控制台展示数据意味着不需要在修改展示数据的代码, 将 Sink 部分的内容放在下⼀个⼤章节去说明

真实的⼯作中, 可能数据是要落地到 MySQL , HBase , HDFS 这样的存储系统中

总结

整个案例运⾏的逻辑是

\1. Python 程序产⽣数据到 HDFS 中

\2. Structured Streaming 从 HDFS 中获取数据

\3. Structured Streaming 处理数据

\4. 将数据展示在控制台

整个案例的编写步骤

\1. Python 程序

\2. Structured Streaming 程序

\3. 运⾏

产⽣⼩⽂件并推送到 HDFS

随便在任⼀⽬录中创建⽂件 files.py , 编写以下内容

import os
for index in range(100):
 content = """
 {"name":"Michael"}
 {"name":"Andy", "age":30}
 {"name":"Justin", "age":19}
 """
 file_name = "/export/dataset/text{0}.json".format(index)
 with open(file_name, "w") as file: 
 file.write(content)
 os.system("/export/servers/hadoop/bin/hdfs dfs -mkdir -p /dataset/dataset/")
 os.system("/export/servers/hadoop/bin/hdfs dfs -put {0} /dataset/dataset/".format(file_name))

流式计算统计 HDFS 上的⼩⽂件

val spark = SparkSession.builder()
 .appName("hdfs_source")
 .master("local[6]")
 .getOrCreate()
spark.sparkContext.setLogLevel("WARN")
val userSchema = new StructType()
 .add("name", "string")
 .add("age", "integer")
val source = spark
 .readStream // 指明读取的是⼀个流式的 Dataset
 .schema(userSchema) // 指定读取到的数据的 Schema
 .json("hdfs://leetom:8020/dataset/dataset") // 指定⽬录位置, 以及数据格式
val result = source.distinct()
result.writeStream
 .outputMode(OutputMode.Update())
 .format("console")
 .start()
 .awaitTermination()

运⾏和流程总结

步骤

\1. 运⾏ Python 程序

\2. 运⾏ Spark 程序

\3. 总结

运⾏ Python 程序

\1. 上传 Python 源码⽂件到服务器中

\2. 运⾏ Python 脚本

# 进⼊ Python ⽂件被上传的位置
cd ~
# 创建放置⽣成⽂件的⽬录
mkdir -p /export/dataset
# 运⾏程序
python gen_files.py

运⾏ Spark 程序 (⾃⾏测试)

\1. 使⽤ Maven 打包

\2. 上传⾄服务器

\3. 运⾏ Spark 程序

# 进⼊保存 Jar 包的⽂件夹
cd ~
# 运⾏流程序
spark-submit --class cn.qf.structured.HDFSSource ./original-streaming-0.0.1.jar

总结

从 Kafka 中读取数据

步骤

\1. Structured Streaming 整合 Kafka

\2. 读取 JSON 格式的内容

\3. 读取多个 Topic 的数据

Kafka 和 Structured Streaming 整合的结构

分析

`Structured Streaming` 中使⽤ `Source` 对接外部系统, 对接 `Kafka` 的 `Source` 叫做
`KafkaSource` `KafkaSource` 中会使⽤ `KafkaSourceRDD` 来映射外部 `Kafka` 的 `Topic`,
两者的 `Partition` ⼀⼀对应

结论

Structured Streaming 会并⾏的从 Kafka 中获取数据

Structured Streaming 读取 Kafka 消息的三种⽅式

Earliest 从每个 Kafka 分区最开始处开始获取

Assign ⼿动指定每个 Kafka 分区中的 Offset

Latest 不再处理之前的消息, 只获取流计算启动后新产⽣的数据

需求介绍

有⼀个智能家居品牌叫做 Nest , 他们主要有两款产品, ⼀个是恒温器, ⼀个是摄像头

恒温器的主要作⽤是通过感应器识别家⾥什么时候有⼈, 摄像头主要作⽤是通过学习算法来识别出现在摄像头中的⼈是否是家⾥⼈, 如果不是则报警

所以这两个设备都需要统计⼀个指标, 就是家⾥什么时候有⼈, 此需求就是针对这个设备的⼀部分数据, 来统计家⾥什么时候有⼈

Kafka⽣产者的数据格式

{
 "devices": {
 "cameras": {
 "device_id": "awJo6rH",
 "last_event": {
 "has_sound": true,
 "has_motion": true,
 "has_person": true,
 "start_time": "2016-12-29T00:00:00.000Z",
 "end_time": "2016-12-29T18:42:00.000Z"
 }
 }
 }
}

使⽤ Structured Streaming 来过滤出来家⾥有⼈的数据

把数据转换为 时间 → 是否有⼈ 这样类似的形式

代码实现

创建 Topic 并输⼊数据到 Topic

  1. 使⽤命令创建 Topic

bin/kafka-topics.sh --create --topic streaming-test --replication-factor 1 --
partitions 3 --zookeeper leetom:2181
  1. 开启 Producer

bin/kafka-console-producer.sh --broker-list 
​
leetom01:9092,leetom02:9092,leetom03:9092 --topic streaming-test
  1. 把 JSON 转为单⾏输⼊

{"devices":{"cameras":{"device_id":"awJo6rH","last_event": {"has_sound":true,"has_motion":true,"has_person":true,"start_time":"2016-12-
29T00:00:00.000Z","end_time":"2016-12-29T18:42:00.000Z"}}}}

Spark代码

import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
 .master("local[6]")
 .appName("kafka integration")
 .getOrCreate()
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types._
val source = spark
 .readStream
 .format("kafka")
 .option("kafka.bootstrap.servers", "leetom01:9092,leetom02:9092,leetom03:9092")
 .option("subscribe", "streaming-test")
 .option("startingOffsets", "earliest")
 .load()
val eventType = new StructType()
 .add("has_sound", BooleanType, nullable = true)
 .add("has_motion", BooleanType, nullable = true)
 .add("has_person", BooleanType, nullable = true)
 .add("start_time", DateType, nullable = true)
 .add("end_time", DateType, nullable = true)
val camerasType = new StructType()
 .add("device_id", StringType, nullable = true)
 .add("last_event", eventType, nullable = true)
val devicesType = new StructType()
 .add("cameras", camerasType, nullable = true)
val schema = new StructType()
 .add("devices", devicesType, nullable = true)
val jsonOptions = Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss.sss'Z'")
import org.apache.spark.sql.functions._
import spark.implicits._
val result = source.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value")
 .select(from_json('value, schema, jsonOptions).alias("parsed_value"))
 .selectExpr("parsed_value.devices.cameras.last_event.has_person as has_person",
 "parsed_value.devices.cameras.last_event.start_time as start_time")
 .filter('has_person === true)
 .groupBy('has_person, 'start_time)
 .count()
result.writeStream
 .outputMode(OutputMode.Complete())
 .format("console")
 .start()
 .awaitTermination()

测试

\1. 进⼊服务器中, 启动 Kafka

\2. 启动 Kafka 的 Producer

bin/kafka-console-producer.sh --broker-list
leetom01:9092,leetom02:9092,leetom03:9092 --topic streaming-test

\3. 启动代码进⾏测试

因为需要和 Kafka 整合, 所以在启动的时候需要加载和 Kafka 整合的包 spark-sql-kafka-0-10

<dependency>
 <groupId>org.apache.spark</groupId>
 <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
 <version>2.2.0</version>
</dependency>

Streaming -> Sink

\1. HDFS Sink

\2. Kafka Sink

\3. MySQL Sink

\4. ⾃定义 Sink

\5. Tiggers

\6. 错误恢复和容错语义

HDFS Sink

案例需求

从 Kafka 接收数据, 从给定的数据集中, 裁剪部分列, 落地于 HDFS

实现步骤

\1. 从 Kafka 读取数据, ⽣成源数据集

\1. 连接 Kafka ⽣成 DataFrame

\2. 从 DataFrame 中取出表示 Kafka 消息内容的 value 列并转为 String 类型

\2. 对源数据集选择列

\1. 解析 CSV 格式的数据

\2. ⽣成正确类型的结果集

\3. 落地 HDFS

代码实现

// 1. 创建 SparkSession 
​
val spark = SparkSession.builder() 
​
 .appName("hdfs_sink") 
​
 .master("local[6]") 
​
 .getOrCreate() 
​
import spark.implicits._ 
​
// 2. 读取 Kafka 数据 
​
val source: Dataset[String] = spark.readStream 
​
 .format("kafka") 
​
 .option("kafka.bootstrap.servers", "leetom01:9092") 
​
 .option("subscribe", "streaming_test") 
​
 .option("startingOffsets", "latest")
​
.load() 
​
 .selectExpr("CAST(value AS STRING) as value") 
​
 .as[String] 
​
// 1::Toy Story (1995)::Animation|Children's|Comedy 
​
// 3. 处理 CSV, Dataset(String), Dataset(id, name, category) 
​
val result = source.map(item => { 
​
 val arr = item.split("::") 
​
 (arr(0).toInt, arr(1).toString, arr(2).toString) 
​
}).as[(Int, String, String)].toDF("id", "name", "category") 
​
// 4. 落地到 HDFS 中 
​
result.writeStream 
​
 .format("parquet") 
​
 .option("path", "output/streaming/moives/") 
​
 .option("checkpointLocation", "checkpoint") 
​
 .start() 
​
 .awaitTermination()
​

Kafka Sink

案例需求

从 Kafka 中获取数据, 简单处理, 再次放⼊ Kafka

实现步骤

\1. 从 Kafka 读取数据, ⽣成源数据集

\1. 连接 Kafka ⽣成 DataFrame

\2. 从 DataFrame 中取出表示 Kafka 消息内容的 value 列并转为 String 类型

\2. 对源数据集选择列

\1. 解析 CSV 格式的数据

\2. ⽣成正确类型的结果集

\3. 再次落地 Kafka

代码实现

// 1. 创建 SparkSession
 val spark = SparkSession.builder()
 .appName("hdfs_sink")
 .master("local[6]")
 .getOrCreate()
 import spark.implicits._
 // 2. 读取 Kafka 数据
 val source: Dataset[String] = spark.readStream
 .format("kafka")
 .option("kafka.bootstrap.servers", "leetom01:9092")
 .option("subscribe", "streaming_test_2")
 .option("startingOffsets", "earliest")
 .load()
 .selectExpr("CAST(value AS STRING) as value")
 .as[String]
 // 1::Toy Story (1995)::Animation|Children's|Comedy
 // 3. 处理 CSV, Dataset(String), Dataset(id, name, category)
 val result = source.map(item => {
 val arr = item.split("::")
 (arr(0).toInt, arr(1).toString, arr(2).toString)
 }).as[(Int, String, String)].toDF("id", "name", "category")
 // 4. 落地到 Kafka 中
 result.writeStream
 .format("kafka")
 .outputMode(OutputMode.Append())
 .option("checkpointLocation", "checkpoint")
 .option("kafka.bootstrap.servers", "node1:9092")
 .option("topic", "streaming_test_2")
 .start()
 .awaitTermination()

MySQL Writer(Foreach)

需求分析

从 Kafka 中获取数据, 处理后放⼊ MySQL

实现步骤

\1. 创建 DataFrame 表示 Kafka 数据源

\2. 在源 DataFrame 中选择三列数据

\3. 创建 ForeachWriter 接收每⼀个批次的数据落地 MySQL

\4. Foreach 落地数据

object _03SinkMysqlTest {
 def main(args: Array[String]): Unit = {
 val spark = SparkSession.builder().appName("kafka").master("local[2]")
 .getOrCreate()
 spark.sparkContext.setLogLevel("WARN")
 import spark.implicits._
 // 获取KafkaSource源
 val source = spark.readStream
 .format("socket")
 .option("host", "leetom01")
 .option("port", 9999)
 .load()
 .as[String]
 val df = source.map(t => {
 val arr = t.split(" ")
 (arr(0), arr(1))
 }).as[(String, String)].toDF("id", "name")
 // 将结果写⼊Mysql
 df.writeStream
 .foreach(new MySQLWriter)
 .start()
 .awaitTermination()
 }
}
/**
* 继承抽象类,实现抽象⽅法
*/
class MySQLWriter extends ForeachWriter[Row]{
 // 定义connection配置变量
 private val driver = "com.mysql.jdbc.Driver"
 private var connection:Connection = _
 private val url = "jdbc:mysql://localhost:3306/test"
 private var statm :Statement = _
 // 打开链接
 override def open(partitionId: Long, version: Long): Boolean = {
 Class.forName(driver)
 connection = DriverManager.getConnection(url,"root","123456")
 statm = connection.createStatement()
 true
 }
 // 写⼊数据
 override def process(value: Row): Unit = {
 statm.executeUpdate(s"insert into test1 values('${value.get(0)}',${value.get(1)})")
 }
 // 关闭连接
 override def close(errorOrNull: Throwable): Unit = {
 connection.close()
 }
}

Trigger

实现步骤

\1. 微批次处理

\2. 连续流处理

微批次处理

什么是微批次 :并不是真正的流, ⽽是缓存⼀个批次周期的数据, 后处理这⼀批次的数据

例⼦:

result.writeStream
 .outputMode(OutputMode.Complete())
 .format("console")
 .start()
 .awaitTermination()

连续流处理

微批次会将收到的数据按照批次划分为不同的 DataFrame , 后执⾏ DataFrame , 所以其数据的处理延迟取决

于每个 DataFrame 的处理速度, 最快也只能在⼀个 DataFrame 结束后⽴刻执⾏下⼀个, 最快可以达到

100ms 左右的端到端延迟

⽽连续流处理可以做到⼤约 1ms 的端到端数据处理延迟

连续流处理可以达到 at-least-once 的容错语义

从 Spark 2.3 版本开始⽀持连续流处理, 我们所采⽤的 2.2 版本还没有这个特性, 并且这个特性截⽌到2.4 依然是实验性质, 不建议在⽣产环境中使⽤

例⼦:

result.writeStream
 .outputMode(OutputMode.Complete())
 .format("console")
 .trigger(Trigger.Continuous("1 second"))
 .start()
 .awaitTermination()

注意:有限制

只⽀持 Map 类的有类型操作

只⽀持普通的的 SQL 类操作, 不⽀持聚合

Source 只⽀持 Kafka

Sink 只⽀持 Kafka , Console , Memory

val spark = SparkSession.builder()
 .appName("triggers")
 .master("local[6]")
 .getOrCreate()
 spark.sparkContext.setLogLevel("WARN")
 // timestamp, value
 val source = spark.readStream
 .format("rate")
 .load()
 // 简单处理
 //
 val result = source
 // 落地
 source.writeStream
 .format("console")
 .outputMode(OutputMode.Append())
 // 不写trigger的话 以最快的速度输出,不等待
 .trigger(Trigger.Once()) // 只处理⼀次
 .trigger(Trigger.ProcessingTime("20 seconds")) // 20秒⼀次
 .start()
 .awaitTermination()

错误恢复和容错语义

三种容错语义

at-most-once

在数据从 Source 到 Sink 的过程中, 出错了, Sink 可能没收到数据, 但是不会收到两次, 叫做 at-most-once

⼀般错误恢复的时候, 不重复计算, 则是 at-most-once

at-least-once

在数据从 Source 到 Sink 的过程中, 出错了, Sink ⼀定会收到数据, 但是可能收到两次, 叫做 at-least-once

⼀般错误恢复的时候, 重复计算可能完成也可能未完成的计算, 则是 at-least-once

exactly-once

在数据从 Source 到 Sink 的过程中, 虽然出错了, Sink ⼀定恰好收到应该收到的数据, ⼀条不重复也⼀条都不

少, 即是 exactly-once

想做到 exactly-once 是⾮常困难的

Sink 的容错

读取 WAL offsetlog 恢复出最新的 offsets

当 StreamExecution 找到 Source 获取数据的时候, 会将数据的起始放在 WAL offsetlog 中, 当出错要恢复的时候, 就可以从中获取当前处理批次的数据起始, 例如 Kafka 的 Offset

读取 batchCommitLog 决定是否需要重做最近⼀个批次

当 Sink 处理完批次的数据写⼊时, 会将当前的批次 ID 存⼊ batchCommitLog , 当出错的时候就可以从中取出进⾏到哪⼀个批次了, 和 WAL 对⽐即可得知当前批次是否处理完

如果有必要的话, 当前批次数据重做

如果上次执⾏在 (5) 结束前即失效, 那么本次执⾏⾥ Sink 应该完整写出计算结果

如果上次执⾏在 (5) 结束后才失效, 那么本次执⾏⾥ Sink 可以重新写出计算结果 (覆盖上次结果), 也可以跳过写出计算结果(因为上次执⾏已经完整写出过计算结果了)

这样即可保证每次执⾏的计算结果, 在 Sink 这个层⾯, 是 不重不丢 的, 即使中间发⽣过失效和恢复, 所以Structured Streaming 可以做到 exactly-once

容错所需要的存储

存储

offsetlog 和 batchCommitLog 关乎于错误恢复

offsetlog 和 batchCommitLog 需要存储在可靠的空间⾥

offsetlog 和 batchCommitLog 存储在 Checkpoint 中

WAL 其实也存在于 Checkpoint 中

指定 Checkpoint

只有指定了 Checkpoint 路径的时候, 对应的容错功能才可以开启

source.writeStream
 .format("console")
 .outputMode(OutputMode.Append())
 .option("checkpointLocation", "path/to/HDFS/dir")// 指定存储
 .format("memory")

需要的外部⽀持

如果要做到 exactly-once , 只是 Structured Streaming 能做到还不⾏, 还需要 Source 和 Sink 系统的⽀持

Source 需要⽀持数据重放

当有必要的时候, Structured Streaming 需要根据 start 和 end offset 从 Source 系统中再次获取数

据, 这叫做重放

Sink 需要⽀持幂等写⼊

如果需要重做整个批次的时候, Sink 要⽀持给定的 ID 写⼊数据, 这叫幂等写⼊, ⼀个 ID 对应⼀条数据进⾏

写⼊, 如果前⾯已经写⼊, 则替换或者丢弃, 不能重复

 类似资料:

相关阅读

相关文章

相关问答