pipelinedb的安装:

ref:

http://docs.pipelinedb.com/installation.html#installation

http://docs.pipelinedb.com/quickstart.html#quickstart


说明: pipelinedb 现在已经是pg的一个插件 ,【Since PipelineDB runs as an extension to PostreSQL, begin by installing PostgreSQL】。 我们需要先安装pg,然后安装pipeline的扩展即可


需要先安装postgresql-server11版本

postgresql11-server-11.2-1PGDG.rhel7.x86_64

postgresql11-11.2-1PGDG.rhel7.x86_64



然后,安装对应版本的pipelinedb

rpm -ivh pipelinedb-postgresql-11-1.0.0-13.centos7.x86_64.rpm


# 初始化pg

su - postgres

/usr/pgsql-11/bin/pg_ctl initdb -D /var/lib/pgsql/11/data


修改pg的配置文件

listen_addresses = '*'

max_worker_processes = 128

shared_preload_libraries = 'pipelinedb'


# 启动pg

/usr/pgsql-11/bin/pg_ctl start -D /var/lib/pgsql/11/data


# 登录pg

/usr/pgsql-11/bin/psql

create database pipeline ;

\c pipeline;

create extension pipelinedb;


# 这里还可以有些创建账号、添加pg_hba地址授权等操作,不在本文讨论的范畴内。

/usr/pgsql-11/bin/psql pipeline  # 登录后,执行如下的2个SQL:

-- 创建一个外部表,实际数据来自pipelinedb
CREATE FOREIGN TABLE wiki_stream (
hour timestamp,
project text,
title text,
view_count bigint,
size bigint)
SERVER pipelinedb;


-- 创建CV视图
CREATE VIEW wiki_stats WITH (action=materialize) AS
SELECT hour, project,
count(*) AS total_pages,
sum(view_count) AS total_views,
min(view_count) AS min_views,
max(view_count) AS max_views,
avg(view_count) AS avg_views,
percentile_cont(0.99) WITHIN GROUP (ORDER BY view_count) AS p99_views,
sum(size) AS total_bytes_served
FROM wiki_stream
GROUP BY hour, project;




# 查看数据表:

pipeline=# \d
List of relations
Schema |       Name       |     Type      |  Owner
--------+------------------+---------------+----------
public | wiki_stats       | view          | postgres
public | wiki_stats_def   | view          | postgres
public | wiki_stats_mrel  | table         | postgres
public | wiki_stats_osrel | foreign table | postgres
public | wiki_stats_seq   | sequence      | postgres
public | wiki_stream      | foreign table | postgres
(6 rows)



# 现在我们将数据集解压缩为流并将其写入stdin,它可以用作COPY的输入

curl -sL http://pipelinedb.com/data/wiki-pagecounts | gunzip | /usr/pgsql-11/bin/psql pipeline -c "COPY wiki_stream (hour, project, title, view_count, size) FROM STDIN"


# 查看测试的数据集:

/usr/pgsql-11/bin/psql pipeline  -c "SELECT * FROM wiki_stats ORDER BY total_views DESC";



# 要查看系统中当前的连续视图及其定义,可以运行以下查询:

pipeline=# SELECT * FROM pipelinedb.views;
id | schema |    name    | active |                                                              query
----+--------+------------+--------+----------------------------------------------------------------------------------------------------------------------------------
3 | public | wiki_stats | t      |  SELECT wiki_stream.hour,                                                                                                       +
|        |            |        |     wiki_stream.project,                                                                                                        +
|        |            |        |     count(*) AS total_pages,                                                                                                    +
|        |            |        |     sum(wiki_stream.view_count) AS total_views,                                                                                 +
|        |            |        |     min(wiki_stream.view_count) AS min_views,                                                                                   +
|        |            |        |     max(wiki_stream.view_count) AS max_views,                                                                                   +
|        |            |        |     avg(wiki_stream.view_count) AS avg_views,                                                                                   +
|        |            |        |     percentile_cont((0.99)::double precision) WITHIN GROUP (ORDER BY ((wiki_stream.view_count)::double precision)) AS p99_views,+
|        |            |        |     sum(wiki_stream.size) AS total_bytes_served                                                                                 +
|        |            |        |    FROM wiki_stream                                                                                                             +
|        |            |        |   GROUP BY wiki_stream.hour, wiki_stream.project
(1 row)



pipelinedb 在实时流式计算方面,有很多用法。具体可以参照德哥的github。