# PipelineDB 快速入门
#### 作者:张文升
#### 发布:2018-01-11
#### 欢迎大家踊跃投稿,投稿信箱: press@postgres.cn
----
## 背景
PipelineDB基于PostgreSQL数据库改造而来,是一款开源的流式计算数据库。它允许我们通过sql的方式,对数据流做操作,并把操作结果持续储存到表中。主要特性:允许只使用 SQL 进行实时数据处理而没有应用代码,兼容 PostgreSQL,无 ETL,高效可持续。
详情可访问官网www.pipelinedb.com。
## 安装
### 从rpm安装
```
axel -n 5 https://s3-us-west-2.amazonaws.com/download.pipelinedb.com/pipelinedb-0.9.6-centos6-x86_64.rpm
rpm -ivh pipelinedb-0.9.6-centos6-x86_64.rpm
```
### 从源码安装
下载代码
```
git clone https://github.com/pipelinedb/pipelinedb.git
```
### 安装所需依赖
```
yum install libreadline6 libreadline6-dev check g++ flex bison zlib1g-dev libpq-dev libncurses-dev
libcurl4-openssl-dev readline-devel zlib-devel
```
### 安装postgis和postgis所需的依赖
```
yum install libxml2 libxml2-devel geos geos-devel proj-devel gdal gdal-devel
```
安装支持pipelinedb的postgis,在configure的时候需要指定pg_config文件,在这里是pipeline-config
```
git clone https://github.com/pipelinedb/postgis.git
./autogen.sh
./configure --with-pgconfig=/usr/lib/pipelinedb/bin/pipeline-config
make
make install
```
### 编译
```
./configure CFLAGS="-g -O2" --prefix=/opt/pipilinedb
make
make install
```
## 快速上手
### 实例化数据目录
创建数据目录,修改数据目录属主
```
mkdir -p /export/pipeline_data
chown postgres.postgres /export/pipeline_data
```
### 实例化数据目录
```
/usr/lib/pipelinedb/bin/pipeline-init -D /export/pipiline_data/
```
### 启动pipelinedb
修改配置文件,开启日志,调整监听端口等等,方法与postgresql一样。然后启动它:
```
/usr/lib/pipelinedb/bin/pipeline-ctl -D /export/pipiline_data/ start
```
### 连接到pipelinedb
```
[postgres@work.com pipiline_data]$ /usr/lib/pipelinedb/bin/pipeline pipeline
pipeline (9.5.3)
Type "help" for help.
pipeline=# select version();
version
---------------------------------------------
PostgreSQL 9.5.3 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 4.4.7 20120313
(Red Hat 4.4.7-17), 64-bit
(1 row)
```
aha,熟悉的终端界面,这里可以很清楚了,我使用的这个版本是基于postgresql的9.5.3开发的,版本还算比较新。:-)
### 查看版本
查看pipelinedb的版本可以用pipeline_version()函数:
```
pipeline=# select pipeline_version();
pipeline_version
------------------------------------------------------------------------------------
PipelineDB 0.9.6 at revision afe6cf1c681f680ecd6ab9a55070abe9b61b494d on
x86_64-pc-linux-gnu, compiled by gcc (GCC) 4.4.7 20120313 (Red Hat 4.4.7-17), 64-bit
(1 row)
```
## 举个栗子
### 创建一个Stream
```
CREATE STREAM wiki_stream (hour timestamp, project text, title text, view_count bigint, size bigint);
```
### 创建一个CONTINUOUS VIEW
```
CREATE CONTINUOUS VIEW wiki_stats AS
SELECT hour, project,
count(*) AS total_pages,
sum(view_count) AS total_views,
min(view_count) AS min_views,
max(view_count) AS max_views,
avg(view_count) AS avg_views,
percentile_cont(0.99) WITHIN GROUP (ORDER BY view_count) AS p99_views,
sum(size) AS total_bytes_served
FROM wiki_stream
GROUP BY hour, project;
```
### 写入测试数据
从外部获取数据实时写入pipelinedb
```
curl -sL http://pipelinedb.com/data/wiki-pagecounts | gunzip |
psql -h localhost -p 5432 -d pipeline -c "
COPY wiki_stream (hour, project, title, view_count, size) FROM STDIN"
```
查询结果
```
SELECT * FROM wiki_stats ORDER BY total_views DESC; \watch 1
(结果略)
```
客户端连接(python)
和postgresql一样,需要使用 psycopg2 。
```
import psycopg2
conn = psycopg2.connect('dbname=test user=user host=localhost port=5432')
pipeline = conn.cursor()
create_cv = """
CREATE CONTINUOUS VIEW continuous_view AS SELECT x::integer, COUNT(*) FROM stream GROUP BY x
"""
pipeline.execute(create_cv)
conn.commit()
rows = []
for n in range(100000):
# 10 unique groupings
x = n % 10
rows.append({'x': x})
# Now write the rows to the stream
pipeline.executemany('INSERT INTO stream (x) VALUES (%(x)s)', rows)
# Now read the results
pipeline.execute('SELECT * FROM continuous_view')
rows = pipeline.fetchall()
for row in rows:
x, count = row
print x, count
pipeline.execute('DROP CONTINUOUS VIEW continuous_view')
pipeline.close()
```
![PostgreSQL_Community](/images/news/2016/pg_bot_banner.jpg)