PipelineDB的基本抽象称为连续视图。连续视图与常规视图非常相似,不同之处在于它从流和表的组合中选择作为输入,并在向这些输入写入新数据时进行增量实时更新。
流数据一旦被使用它的连续视图读取,便会被丢弃,原始的数据不会存储在任何地方。对于一个连续视图而言,惟一持久化的数据是通过从SELECT * FROM current_view返回的数据。因此,我们可以将连续视图看作是一个非常高吞吐量的、实时的物化视图(Materialized View)。
连续视图被定义为传统的PostgreSQL视图,操作参数设置为物化。下面是创建连续视图的语法:
CREATE VIEW name [WITH (action=materialize [, ...])] AS query;
只要选择了一个流,PipelineDB就会用一个物化动作来解释CREATE VIEW语句。
其中的查询(query)是一个PostgreSQL选择语句的子集:
SELECT [ DISTINCT [ ON ( expression [, ...] ) ] ]
expression [ [ AS ] output_name ] [, ...]
[ FROM from_item [, ...] ]
[ WHERE condition ]
[ GROUP BY expression [, ...] ];
其中,from_item可以是以下内容:
stream_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
table_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
from_item [ NATURAL ] join_type from_item [ ON join_condition ]
流类似于表,会有连续的数据行被写入。
expression:一个PostgreSQL表达式或分组规则。
output_name:用于命名表达式的可选标识符。
condition:计算结果为布尔型结果的任何表达式,表示任何不满足此条件的行都将从输出中删除。
要从系统中删除连续视图,则使用DROP view命令。它的语法很简单:
DROP VIEW name;
这将从系统中删除连续视图及其所有相关资源。
要删除连续视图的所有数据而不删除连续视图本身,可以使用函数:
SELECT truncate_continuous_view('name');
这个命令将有效地删除连续视图的所有行,与PostgreSQL的TRUNCATE命令是一致的。
要查看系统中当前的连续视图及其定义,可以运行以下查询:
SELECT * FROM pipelinedb.views;
由于连续视图与常规视图非常相似,因此从连续视图中检索数据只需对其执行SELECT即可。任何SELECT语句在连续视图上都是有效的,允许对不断更新的内容执行进一步分析。
一个常见的PipelineDB模式是在聚合分组中包含一个基于时间顺序的列,并删除由该列决定的不再需要的旧数据。PipelineDB还提供了,通过在连续视图上指定TTL规则的方式删除过期数据。
TTL过期行为可以通过ttl与ttl_column存储参数进行配置。过期数据由回收进程处理,该进程将删除ttl_column值大于ttl指定的时间范围的数据。创建指定TTL规则的连续视图语句如下:
CREATE VIEW v_ttl WITH (ttl = '1 month', ttl_column = 'minute') AS
SELECT minute(arrival_timestamp), COUNT(*) FROM some_stream GROUP BY minute;
回收进程会删除时间早于minute列对应值一个月的数据行。请注意,TTL行为是对回收进程的一个提示,因此不能保证在数据将过期时准确地进行物理删除行。
为了保证不会读取过期数据行,应该在创建连续视图时,带上WHERE子句用来在读取时排除过期数据行。
TTL可以通过pipelinedb从连续视图中添加、修改和删除:
pipelinedb.set_ttl ( cv_name, ttl, ttl_column )
用给定参数更新指定的连续视图TTL,其中,cv_name是视图名称,ttl是用字符串表示的时间间隔,如,‘1 day’,ttl_column是对应的基于时间顺序的列名称。
有效的删除指定的连续视图中的TTL,只需将ttl与ttl_column参数都传递为NULL。注意,不能在滑动窗口连续视图上修改TTL,也不能从滑动窗口连续视图中删除TTL。
因为连续视图会不断地处理输入的流数据,所以激活和停止数据处理,而不必完全关闭视图的功能是很有必要的。如果一个连续视图增加了额外的系统负载或开始抛出错误,那么暂时停止该视图的连续处理直到问题得到解决,就可以利用到这个功能。
该级别的控制是由激活和禁用功能提供的,这是所谓的激活与停止。当连续视图处于活动状态时,会积极地从输入流中读取数据,并相应增量的更新结果。相反,不活动的连续视图不会从其输入流中读取数据,也不会更新结果。当连续视图处于非活动状态时,连续视图本身仍然是可读的,只是没有被更新而已。
continuous_view_or_transform为连续视图或转换名称:
SELECT pipelinedb.activate('continuous_view_or_transform');
SELECT pipelinedb.deactivate('continuous_view_or_transform');
当连续查询视图处于非活动状态时,该连续查询将永远不会读取它在处于非活动状态时写入其输入流的任何事件,即使在它被重新激活后,仍然会将这些数据丢弃。
对于一个连续视图,由PipelineDB持久化的唯一数据是SELECT * FROM continuous_view_or_transform的返回结果集。比如,下方的这个连续视图,只会在PipelineDB中存储一行(只有几个字节),即使它在一段时间内读取了一万亿个事件:
CREATE VIEW avg_of_forever AS SELECT AVG(x) FROM one_trillion_events_stream;
计算每个URL每天看到的不同用户数量:
CREATE VIEW uniques AS
SELECT date_trunc('day', arrival_timestamp) AS day,
referrer, COUNT(DISTINCT user_id)
FROM users_stream GROUP BY day, referrer;
计算以分钟为单位的数据点的线性回归:
CREATE VIEW lreg AS
SELECT date_trunc('minute', arrival_timestamp) AS minute,
regr_slope(y, x) AS mx,
regr_intercept(y, x) AS b
FROM datapoints_stream GROUP BY minute;
计算在过去五分钟内,有统计的数据量:
CREATE VIEW imps AS
SELECT COUNT(*) FROM imps_stream
WHERE (arrival_timestamp > clock_timestamp() - interval '5 minutes');
服务器请求延迟为90、95和99的百分位数:
CREATE VIEW latency AS
SELECT percentile_cont(array[90, 95, 99]) WITHIN GROUP (ORDER BY latency)
FROM latency_stream;