由于 Continuous Views 是随着时间连续地和增量地更新的,所以在更新 Continuous Views 的结果时,PipelineDB有能力考虑当前时间。包含WHERE子句和与当前时间相关的时间组件的查询称为sliding-window 查询。滑动WHERE子句筛选或接受的事件集会随着时间不断变化。
sliding WHERE子句 两个重要组成部分:
clock_timestamp ( ):一个内置函数,它总是返回当前的时间戳。
arrival_timestamp:所有传入事件的一个特殊属性,包含PipelineDB接收它们的时间,就像描述在Arrival Ordering的一样。
但是,没有必要显式地添加引用这些值的WHERE子句。PipelineDB在内部执行此操作,并且只需要在Continuous Views 的定义中指定sw存储参数。
尽管sliding windows 是SQL数据库的新概念,但是PipelineDB没有使用任何新的或专有的窗口语法。相反,PipelineDB 使用标准PostgreSQL 9.5语法。
PipelineDB允许用户在定义滑动窗口连续视图时手动构造滑动窗口WHERE子句。
这个连续视图上的SELECT结果只包含最后一分钟内看到的特定用户。也就是说,重复的SELECT将包含不同的行,即使连续视图没有显式更新。
每次计算clock_timestamp() - interval '1 minute' 时,它将返回与过去1分钟相对应的时间戳。如果给定事件的arrival_timestamp 在过去大于1分钟,则添加arrival_timestamp 和 > 意味着该谓词将计算为true。由于每次读取新事件时都会评估谓词,因此这有效地给了我们一个1分钟宽的 sliding window 。
实验:
CREATE FOREIGN TABLE stream (x integer, y integer) SERVER pipelinedb;
CREATE VIEW v_sw_test_01 WITH (sw = '1 minute') AS SELECT user_id::integer FROM stream;
写一个python脚本,向stream中插入数据。
import psycopg2
conn = psycopg2.connect("host=localhost user=mytest")
for i in xrange(51,100):
SQL = "INSERT INTO stream VALUES (%s,2);" % i
cur = conn.cursor()
cur.execute(SQL)
conn.commit()
time.sleep(23)
边执行脚本,边查看v_sw_test_01视图中的数据,发现当前存在的数据,只是最近一分钟插入的数据。
mytest=# select * from recent_users;
x
----
77
78
79
(3 rows)
过一会儿
mytest=# select * from recent_users;
x
----
78
79
(2 rows)
再过一会儿
mytest=# select * from recent_users;
x
----
78
79
80
(3 rows)
实际上,在内部,PipelineDB将将此查询重写为以下内容:
CREATE VIEW v_sw_test_01 AS
SELECT x::integer FROM stream
WHERE (arrival_timestamp > clock_timestamp() - interval '1 minute');
============================================================
Sliding Aggregates
Sliding-window 查询还与聚合函数一起工作。滑动聚合通过尽可能多地聚集它们的输入而工作,但不丢失需要知道如何随着时间的推移从窗口中移除信息的粒度。这种部分聚合对用户来说是透明的,只有完全聚集的结果才会在滑动窗口聚集中可见。
CREATE VIEW v_count_sw_test_ WITH (sw = '1 minute') AS SELECT COUNT(*) FROM stream;
每次在这个 Continuous Views 上运行SELECT时,它返回的计数将只是最后一分钟内看到的事件的计数。例如,如果事件停止进入,则每次在 Continuous Views 上运行SELECT时,计数都会减少。
CREATE VIEW sensor_temps WITH (sw = '5 minutes') AS SELECT sensor::integer, AVG(temp::numeric) FROM sensor_stream GROUP BY sensor;
5分钟平均温度是多少
CREATE VIEW uniques WITH (sw = '30 days') AS SELECT COUNT(DISTINCT user::integer) FROM user_stream;
在过去的30天里,我们看到了多少个不重复的用户
============================================================
Temporal Invalidation
显然, continuous views 中的滑动窗口行在一定时间之后变得无效,因为它们已经太老而不能包含在 continuous views 的结果中。这样的行必须是垃圾收集,这可能以两种方式发生:
Background invalidation:类似于PostgreSQL的自动清空器的后台进程定期运行并从sliding-window continuous views中物理删除任何过期的行。
Read-time invalidation:当使用SELECT读取 continuous view 时,在生成结果的同时动态地丢弃任何太旧而不能包括在结果中的数据。这确保即使仍然存在无效行,它们实际上也不包含在任何查询结果中。
============================================================
step_factor
在内部,尽可能聚合支持 sliding-window 查询的物化表。但是,不能将行聚合到与查询的最终输出相同的粒度级别,因为当数据超出窗口时,必须从聚合结果中删除数据。
例如,按小时聚合的 sliding-window 查询实际上可能具有磁盘上的分钟级聚合数据,因此只有最后60分钟包含在返回给读取器的最终聚合结果中。这些用于 sliding-window 查询的内部更细粒度的聚合级别称为“步骤”。“覆盖”视图被放置在这些步骤聚合之上,以便在读取时执行最终聚合。
步骤聚合是决定 sliding-window 查询读取性能的重要因素,因为每个最终滑动窗口聚合组在内部由多个步骤组成。每个滑动窗口聚合组将具有的步骤数量可以通过step_factor参数进行调整:
step_factor
一个介于1和50之间的整数,将 sliding-window 步长的大小指定为由sw给出的窗口大小的百分比。如果使用较小的step_factor,则在数据离开窗口时提供更大的粒度,代价是磁盘上的物化表更大。更大的step_factor将减少磁盘上的物化表大小,但代价是减少窗口外粒度。
CREATE VIEW v_sw_test_03 WITH (sw = '1 hour', step_factor = 50) AS SELECT COUNT(*) FROM stream;