在使用pipelineDB之前先介绍几个术语,以便后面更好的理解
stream,数据写入流中,流中的数据不落盘。流通常会写入到continue view或continue transform,用于存储实时计算的结果(continue view),或者定义对流数据进行处理的规则(continue transform)。
continue view,经过实时计算后的结果,存入continue view,这个视图非常类似于常规的视图,这里可以把CV(continue view)当做高吞吐量的实时雾化视图。
continue transform,定义对流数据进行处理的规则,当流中的数据记录满足continue transform QUERY定义的条件时,触发规则。和CV不同,因为不会存储数据,不支持聚合,输出的结果可以当做其他流的流入口,也可以写到外部数据存储
滑窗 continue view,属于continue view的一种。
由于CV会随着时间的推移不断更新,因此当更新CV的时候,PipelineDB可以考虑获取到当前的时间。包含一段时间内的WHERE子句的查询称为滑动窗口查询。滑动WHERE子句过滤条件会随着时间的推移而不断变化。
例如:定义滑窗为1小时,那么这个视图就是最近一小时的统计,为了得到这个统计值,必须实时老化一小时前的数据,保持统计是最近一小时的。
实际上pipelinedb内部通过定义比滑窗更小粒度窗口的实时统计,把窗口切成更小的窗口,查询时对小粒度窗口进行汇聚产生大窗口的数据。
例如定义的窗口为1小时,那么可以按分钟的粒度进行统计,查询时,汇聚最近的60个窗口的数据,得到小时的窗口数据。
接下来,我们分别介绍一下如果使用上面介绍的功能
写入流的原发和写入表的语法类似
以下是官网的一些例子,stream比较好理解
写入单个事件,也就是单条记录
INSERT INTO stream (x, y, z) VALUES (0, 1, 2);
INSERT INTO json_stream (payload) VALUES (
'{"key": "value", "arr": [92, 12, 100, 200], "obj": { "nested": "value" } }'
);
批量写入到流
INSERT INTO stream (x, y, z) VALUES (0, 1, 2), (3, 4, 5), (6, 7, 8)
(9, 10, 11), (12, 13, 14), (15, 16, 17), (18, 19, 20), (21, 22, 23), (24, 25, 26);
也可以包含表达式
INSERT INTO geo_stream (id, coords) VALUES (42, a_function(-72.09, 41.40));
INSERT INTO udf_stream (result) VALUES (my_user_defined_function('foo'));
INSERT INTO str_stream (encoded, location) VALUES
(encode('encode me', 'base64'), position('needle' in 'haystack'));
INSERT INTO rad_stream (circle, sphere) VALUES
(pi() * pow(11.2, 2), 4 / 3 * pi() * pow(11.2, 3));
-- 支持select子句插入到流
INSERT INTO ss_stream (x) SELECT generate_series(1, 10) AS x;
INSERT INTO tab_stream (x) SELECT x FROM some_table;
也可以使用prepare insert,以减少网络带宽
PREPARE write_to_stream AS INSERT INTO stream (x, y, z) VALUES ($1, $2, $3);
EXECUTE write_to_stream(0, 1, 2);
EXECUTE write_to_stream(3, 4, 5);
EXECUTE write_to_stream(6, 7, 8);
也可以使用copy将文件中的数据写入流
COPY stream (data) FROM '/some/file.csv'
以下是从亚马逊云对象压缩存储的s3到pipelinedb的例子
aws s3 cp s3://bucket/logfile.gz - | gunzip | pipeline -c "COPY stream (data) FROM STDIN"
可以通过output_of函数读取其他CV和CT,如:
以下是一个简单的例子,只是对流中的数据求和,然后存入到v_sum
CREATE VIEW v_sum AS SELECT sum(x) FROM stream;
如果我们要记录总和变化超过10的,我们可以再创建一个CV,从v_sum中读取数据完成要求
CREATE VIEW v_deltas AS SELECT abs((new).sum - (old).sum) AS delta
FROM output_of('v_sum')
WHERE abs((new).sum - (old).sum) > 10;
pipelinedb.stream_targets设置
hank=# create view cv1 as select x,y,z from stream1;
CREATE VIEW
hank=# create view cv3 as select x,y,z from stream1;
CREATE VIEW
设置为cv1后,插入流的数据,只会看到cv1新增的记录,cv3无法看到
hank=# SET pipelinedb.stream_targets TO cv1;
SET
hank=# select * from cv1;
x | y | z
---+---+---
1 | |
(1 row)
hank=# select * from cv3;
x | y | z
---+---+---
1 | |
(1 row)
hank=# INSERT INTO stream1 (x) VALUES (2);
INSERT 0 1
hank=# INSERT INTO stream1 (x) VALUES (3);
INSERT 0 1
hank=# INSERT INTO stream1 (x) VALUES (4);
INSERT 0 1
hank=# select * from cv1;
x | y | z
---+---+---
1 | |
2 | |
3 | |
4 | |
(4 rows)
hank=# select * from cv3;
x | y | z
---+---+---
1 | |
(1 row)
同理,设置为cv3后也是类似的结果
hank=# SET pipelinedb.stream_targets TO cv3;
SET
hank=# INSERT INTO stream1 (x) VALUES (5);
INSERT 0 1
hank=# INSERT INTO stream1 (x) VALUES (6);
INSERT 0 1
hank=# select * from cv1;
x | y | z
---+---+---
1 | |
2 | |
3 | |
4 | |
(4 rows)
hank=# select * from cv3;
x | y | z
---+---+---
1 | |
5 | |
6 | |
(3 rows)
设置为default后,两个cv都可以写入
hank=# SET pipelinedb.stream_targets TO default;
SET
hank=# INSERT INTO stream1 (x) VALUES (7);
INSERT 0 1
hank=# select * from cv1;
x | y | z
---+---+---
1 | |
2 | |
3 | |
4 | |
7 | |
(5 rows)
hank=# select * from cv3;
x | y | z
---+---+---
1 | |
5 | |
6 | |
7 | |
(4 rows)
生成测试数据和测试的CV
$ psql
创建一个流
=# CREATE FOREIGN TABLE test_stream (key integer, value integer) SERVER pipelinedb;
CREATE FOREIGN TABLE
创建CV
=# CREATE VIEW test_view WITH (action=materialize) AS SELECT key, COUNT(*) FROM test_stream GROUP BY key;
CREATE VIEW
使用insert语句发出一个事件,也就是给流中插入一条记录
$ psql
=# INSERT INTO test_stream (key, value) VALUES (0, 42);
INSERT 0 1
现在给流中插入更多的数据
INSERT INTO test_stream (key, value) SELECT random() * 10, random() * 10 FROM generate_series(1, 100000);
INSERT 0 100000
可以看到插入了100001个事件
hank=# SELECT sum(count) FROM test_view;
sum
--------
100001
(1 row)
查询可以看到合并完成,这里对11个值分别进行了count统计
hank=# select * from test_view;
key | count
-----+-------
7 | 10118
3 | 10083
10 | 4972
0 | 4999
6 | 9881
8 | 10040
5 | 9913
4 | 10088
9 | 9941
1 | 9962
2 | 10004
(11 rows)
删除CV
DROP VIEW name
清空CV的数据
SELECT truncate_continuous_view('name');
查看当前系统中CV的定义
hank=# SELECT * FROM pipelinedb.views;
id | schema | name | active | query
----+--------+-----------+--------+----------------------------
14 | public | test_view | t | SELECT test_stream.key, +
| | | | count(*) AS count +
| | | | FROM test_stream +
| | | | GROUP BY test_stream.key
(1 row)
Time-to-Live (TTL)Expiration
通俗来讲就是保留多久的数据在CV中,用过mongodb的同学应该知道TTL collection的概念,这里也是一个道理。
一个TTL的CV包含一个基于时间的列,删除过期的数据都是基于这个时间列进行的,TTL可以指定两个存储参数ttl和ttl_column,过期的数据(指定ttl_column列的数据大于ttl设定的值)由进程reaper处理删除。
比如以下例子告诉你,基于minute列的数据只保留最近一个月的
CREATE VIEW v_ttl WITH (ttl = '1 month', ttl_column = 'minute') AS
SELECT minute(arrival_timestamp), COUNT(*) FROM some_stream GROUP BY minute;
修改TTL值,可以通过以下语句修改,删除,注意无法在滑窗CV中修改TTL参数
pipelinedb.set_ttl ( cv_name, ttl, ttl_column )
CV和CT激活和停用
SELECT pipelinedb.activate('continuous_view_or_transform');
SELECT pipelinedb.deactivate('continuous_view_or_transform');
注意,当CV或者CT停用后,写入到流的数据不会被CV或者CT计算,就算重新激活了,之前的没有被计算的数据也不会在CV或者CT中读到,也就是说,停用后,写入流的数据是丢失的,激活后,只会对新的数据进行处理。
如下图,示例
创建两个流
hank=# create foreign table stream1 (x int,y int,z timestamp) server pipelinedb;
CREATE FOREIGN TABLE
hank=# create foreign table stream2 (x int,y int,z timestamp) server pipelinedb;
CREATE FOREIGN TABLE
创建2个CV
hank=# create view cv1 as select x,y,z from stream1;
CREATE VIEW
hank=# create view cv2 as select x,y,z from stream2;
CREATE VIEW
创建CT,这里的CT是把写入CV1的数据作为输出进入流stream2
hank=# CREATE VIEW ct1 WITH (action=transform, outputfunc=pipelinedb.insert_into_stream('stream2')) AS select x,y,z from stream1;
CREATE VIEW
hank=# insert into stream1 values(1,2,now());
INSERT 0 1
可见CV1和CV2数据是一样的
hank=# select * from cv1;
x | y | z
---+---+----------------------------
1 | 2 | 2019-07-05 09:19:01.796078
(1 row)
hank=# select * from cv2;
x | y | z
---+---+----------------------------
1 | 2 | 2019-07-05 09:19:01.796078
(1 row)
这里用到的的是pipeline_stream_insert
删除continuous_transform
DROP VIEW pipelinedb.continuous_transform;
查看CT定义
SELECT * FROM pipelinedb.transforms;
上面的例子中我们使用了pipelinedb.insert_into_stream(‘stream2’)这个函数,这个是系统内置的,我们也可以创建自己定义的函数,如下:
创建一个表t1
CREATE TABLE t1 (a int,b int,c timestamp);
创建自定义的触发器函数insert_into_t1(),函数意思为插入steam1的数据插入到t1表中
CREATE OR REPLACE FUNCTION insert_into_t1()
RETURNS trigger AS
$$
BEGIN
INSERT INTO t1 (a,b,c) VALUES (NEW.x, NEW.y,NEW.z);
RETURN NEW;
END;
$$
LANGUAGE plpgsql;
创建ct,把x大于100插入流的数据,插入到表t1中
CREATE VIEW ct2 WITH (action=transform, outputfunc=insert_into_t1) AS
SELECT x,y,z FROM stream1 WHERE x > 100;
插入数据查看结果,可见t1表中已经插入数据
hank=# insert into stream1 values(101,2,now());
INSERT 0 1
hank=# select * from t1;
a | b | c
-----+---+----------------------------
101 | 2 | 2019-07-05 09:38:57.405055
(1 row)
这里直接看例子:
查看最新一分钟的数据
CREATE VIEW recent_data WITH (sw = '1 minute') AS
SELECT x,y,z FROM stream1;
内部等同于以下:
CREATE VIEW recent_data AS
SELECT x,y,z FROM stream1
WHERE (arrival_timestamp > clock_timestamp() - interval '1 minute');
hank=# insert into stream1 values(1,2,now());
INSERT 0 1
hank=# select * from recent_data;
x | y | z
---+---+----------------------------
1 | 2 | 2019-07-05 10:55:35.986077
(1 row)
过一分钟再查看已经没有记录了
hank=# select * from recent_data;
x | y | z
---+---+---
(0 rows)
一个简单的滑窗聚合,统计最后一分钟的记录
hank=# CREATE VIEW count_recent_data WITH (sw = '1 minute') AS
hank-# SELECT COUNT(*) FROM stream1;
CREATE VIEW
hank=# select * from count_recent_data;
count
-------
(1 row)
hank=# insert into stream1 values(1,2,now());
INSERT 0 1
hank=# insert into stream1 values(2,2,now());
INSERT 0 1
hank=# insert into stream1 values(3,2,now());
INSERT 0 1
hank=# select * from count_recent_data;
count
-------
3
(1 row)
数据时效
很显然,CV中的滑动窗口在一定时间后变得无效,因为它们已经变得太旧而不能存在于CV的结果中。因此,这些行必须进行垃圾收集,这可以通过两种方式实现:
Background invalidation
通过autovacuumer周期性的进行垃圾整理,删除滑窗过期的行。
Read-time invalidation
当使用SELECT读取CV时,过期的数据都会在生成结果时动态丢弃。这样可以确保即使过期行仍然存在,但它们实际上不会出现在任何查询结果中。
step_factor参数
可以指定1~50,指定滑动窗口步长的大小,以sw指定的窗口大小的百分比表示,越小的step_factor将提供更细的粒度,物化视图表会占用更大的磁盘空间。越大的step_factor可以减少磁盘的占用
进一步解释一下,就是根据这个粒度切分后进行统计的,然后大的滑窗统计小窗口的数据进行汇总,滑窗粒度越小,小窗口越多,保留小窗口数据越多,所以占的空间更大。大窗口,表示统计的间隔时间更长,保留的统计数据更少,所以占的空间更小。
看个例子,这个例子表示sw的50%,也就是30分钟为一个步长,所以清理时候的粒度也是以30分钟为单位的,
CREATE VIEW hourly (WITH sw = '1 hour', step_factor = 50)
AS SELECT COUNT(*) FROM stream;