我们在处理流数据的时候,往往会有实时性要求。可是如果我们直接按照程序所在服务器的当前时间计算又不行,比如当上游日志数据延迟了,则所有的这部分数据都会被抛弃掉。所以一般我们在记录日志的时候,加上日志的时间戳。这样我们在进行流处理的时候,就可以把日志记录的时间拿出来,根据这个时间来决定流处理是不是要往下进行。而往往我们会以最早到达的日志作为时间参考点,如果下一个日志比这个时间点晚的太多,就可以抛弃掉。这样的目的就是不需要等待延迟太多的日志以牺牲小部分的数据完整性来保证实时性。而一般来说,在日志服务器端,往往如果日志延迟了就一起延迟,只有极少情况少部分日志延迟,这样在处理端大部分情况下数据的存在率还是比较高的。
当我们以日志的记录时间来检测延迟以保证实时性的时候,spark streaming的withWatermark函数则提供了这种功能。我们接着以上一篇介绍的event hub为数据源,来模拟这个操作。往event hub发送数据的格式和上一篇完全一样,类似下面的”EventHub;20180619 15:20:08“格式。
Send data on EventHub;20180619 15:20:08
Send data on EventHub;20180619 15:15:09
Send data on EventHub;20180619 15:07:09
Send data on EventHub;20180619 15:10:09
Send data on EventHub;20180619 15:23:09
Send data on EventHub;20180619 15:30:09
Send data on EventHub;20180619 15:25:10
Send data on EventHub;20180619 15:18:10
然后在spark streaming的代码里面,我们对后面的时间戳解析并用来作为withWatermark的时间。通过输出模式设定为update来查看它是怎么处理每条记录并保证实时性的。下面代码的前面和结尾部分可以参考上一篇,这里主要是关键的处理部分。
val streamingInputDF =
spark
.readStream // DataStreamReader
.format("eventhubs") // DataStreamReader
.options(eventHubsConf.toMap) // DataStreamReader
.load() // DataFrame
// split lines by whitespaces and explode the array as rows of 'word'
val df = streamingInputDF.select($"body".cast("string"))
.withColumn("_tmp", split($"body", ";"))
.select(
$"_tmp".getItem(0).as("name"),
$"_tmp".getItem(1).as("ptime")
).drop("_tmp")
.withColumn("posttime", to_timestamp($"ptime", "yyyyMMdd HH:mm:ss"))
.drop("ptime")
.withWatermark("posttime", "15 minutes")
.groupBy(
window($"posttime", "5 minutes", "5 minutes"),
$"name"
)
.count()
.writeStream
.outputMode("update")
.format("console")
.start()
df.awaitTermination()
Duration是一分钟,withWatermark的最大延迟是15分钟,时间窗口5分钟,滑动窗口也是5分钟。我们可以看到输出结果中,每分钟(batch)输出结果。而对于Batch:34,由于这个时间比目前最早到达的时间晚了超过15分钟,于是就直接被抛弃掉了。
-------------------------------------------
Batch: 33
-------------------------------------------
+--------------------+--------+-----+
| window| name|count|
+--------------------+--------+-----+
|[2018-06-19 14:45...|EventHub| 6|
+--------------------+--------+-----+
2018-06-19T07:07:59.458+0000: [GC (Allocation Failure) [PSYoungGen: 1273436K->16986K(1277440K)] 1746956K->490514K(5551104K), 0.0156202 secs] [Times: user=0.04 sys=0.00, real=0.02 secs]
-------------------------------------------
Batch: 34
-------------------------------------------
+------+----+-----+
|window|name|count|
+------+----+-----+
+------+----+-----+
-------------------------------------------
Batch: 35
-------------------------------------------
+--------------------+--------+-----+
| window| name|count|
+--------------------+--------+-----+
|[2018-06-19 14:45...|EventHub| 7|
+--------------------+--------+-----+