下载页面:https://www.pipelinedb.com/download
ubuntu下安装
dpkg -i pipelinedb-0.9.8u2-ubuntu16-x86_64.deb
创建一个非root用户
useradd -g root frank
切换到用户:
su frank
初始化数据目录:
pipeline-init -D <data directory>
修改配置:进入刚才指定的目录下修改pipelinedb.conf
listen_address="*"
log_timezone = 'PRC'
timezone = 'PRC'
shared_preload_libraries = plsh
pg_hba.conf配置所有ip都是白名单(方便测试)
host all all 0.0.0.0/0 trust
安装plsh扩展:
git clone https://github.com/petere/plsh.git
make
make install
后台启动:
pipeline-ctl -D /home/frank/pipedb/ -l p.log start
连接:
pipeline pipeline (如果是root下进入则要加-u frank)
添加plsh扩展:
CREATE EXTENSION plsh;
按照项目需求创建流和视图,demo:(注意实际环境下的视图一般是有OrderBy devid这种类似的写法的,每个设备有一行统计,有多少设备视图有多少行)
CREATE STREAM stream_test (x integer, y integer);
CREATE CONTINUOUS VIEW view_test AS SELECT max(x) AS maxx,sum(y) AS sumy,avg(x) AS avgx FROM stream_test;
手动插入观察效果:
pipeline=# SELECT * from view_test;
maxx | sumy | avgx
------+------+------
(0 rows)
pipeline=# INSERT into stream_test VALUES (15,9);
INSERT 0 1
pipeline=# SELECT * from view_test;
maxx | sumy | avgx
------+------+---------------------
15 | 9 | 15.0000000000000000
(1 row)
pipeline=# INSERT into stream_test VALUES (12,49);
INSERT 0 1
pipeline=# SELECT * from view_test;
maxx | sumy | avgx
------+------+---------------------
15 | 58 | 13.5000000000000000
(1 row)
pipeline=# INSERT into stream_test VALUES (19,61);
INSERT 0 1
pipeline=# SELECT * from view_test;
maxx | sumy | avgx
------+------+---------------------
19 | 119 | 15.3333333333333333
(1 row)
pipeline=#
安装并测试完成。
访问当前的sumy只要一个sql语句(如上面的Select from view),这里可以用任何连接postgresql的驱动来完成。
例如上面测试条件下,x大于100的数据过来的时候触发事件,调用发送邮件的接口。
创建函数:
CREATE or REPLACE FUNCTION func_test() RETURNS trigger AS
$$
#!/bin/bash
curl -x http://localhost/sendemail
$$
LANGUAGE plsh;
创建transform:
CREATE CONTINUOUS TRANSFORM ct_test AS
SELECT x,y FROM stream_test WHERE x > 100
THEN EXECUTE PROCEDURE trtest();