当前位置: 首页 > 工具软件 > PipelineDB > 使用案例 >

PostgreSQL pipelinedb 流计算插件 - IoT应用 - 实时轨迹聚合

闻人越
2023-12-01

 


背景

IoT场景,车联网场景,共享单车场景,人的行为位点等,终端实时上报的是孤立的位点,我们需要将其补齐成轨迹。

例如共享单车,下单,开锁,生成订单,骑行,关闭订单,关锁。这个过程有一个唯一的订单号,每次上报的位点会包含时间,订单号,位置。

根据订单号,将点聚合为轨迹。

使用pipelinedb插件,可以实时的实现聚合。

例子

以ECS (centos 7.x x64), postgresql 10 为例

1、编译zeromq

wget https://github.com/zeromq/libzmq/releases/download/v4.2.5/zeromq-4.2.5.tar.gz  
tar -zxvf zeromq-4.2.5.tar.gz  
cd zeromq-4.2.5  
./configure  
make  
make install  

2、编译pipelinedb

wget https://github.com/pipelinedb/pipelinedb/archive/1.0.0rev4.tar.gz  
tar -zxvf 1.0.0rev4.tar.gz   
cd pipelinedb-1.0.0rev4/  
  
vi Makefile  
  
SHLIB_LINK += /usr/local/lib/libzmq.so -lstdc++  
  
. /var/lib/pgsql/env.sh 1925  
  
USE_PGXS=1 make  
USE_PGXS=1 make install  

3、配置postgresql.conf

max_worker_processes = 512  
shared_preload_libraries = 'pipelinedb'  
pipelinedb.stream_insert_level=async
pipelinedb.num_combiners=8
pipelinedb.num_workers=16
pipelinedb.num_queues=16
pipelinedb.fillfactor=75
pipelinedb.continuous_queries_enabled=true  

重启

pg_ctl restart -m fast  

4、安装插件

postgres=# create extension pipelinedb;  

5、创建stream,实时写入轨迹点

CREATE FOREIGN TABLE s1 ( order_id int8, ts timestamp, pos geometry )  
SERVER pipelinedb;  

6、创建Continue view,实时聚合

CREATE VIEW cv1 WITH (action=materialize ) AS   
select order_id, min(ts) min_ts, array_agg(ts||','||st_astext(pos)) as seg  
from s1  
group by order_id;  

激活视图(默认已激活)

select pipelinedb.activate('public.cv1');  

7、压测

vi test.sql  
\set order_id random(1,100000)  
\set x random(70,90)  
\set y random(120,125)  
insert into s1 (order_id, ts, pos) values (:order_id, clock_timestamp(), st_makepoint(:x+10*random(), :y+10*random()));  
  
  
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 256 -j 256 -T 120  

8、压测结果

transaction type: ./test.sql  
scaling factor: 1  
query mode: prepared  
number of clients: 256  
number of threads: 256  
duration: 120 s  
number of transactions actually processed: 17614607  
latency average = 1.740 ms  
latency stddev = 1.730 ms  
tps = 146550.933776 (including connections establishing)  
tps = 146906.482277 (excluding connections establishing)  
script statistics:  
 - statement latencies in milliseconds:  
         0.002  \set order_id random(1,10000000)  
         0.001  \set x random(70,90)  
         0.000  \set y random(120,125)  
         1.742  insert into s1 (order_id, ts, pos) values (:order_id, clock_timestamp(), st_makepoint(:x+10*random(), :y+10*random()));  
postgres=# \x  
Expanded display is on.  
-[ RECORD 17 ]---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------  
order_id | 8672585  
min_ts   | 2018-11-01 18:44:08.140027  
seg      | {"2018-11-01 18:44:08.140027,POINT(78.3615547642112 121.881739947945)","2018-11-01 18:44:11.739248,POINT(80.9645632216707 121.450987955555)"}  
-[ RECORD 18 ]---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------  
order_id | 4011211  
min_ts   | 2018-11-01 18:44:08.166407  
seg      | {"2018-11-01 18:44:08.166407,POINT(87.126777020283 132.819293198176)","2018-11-01 18:44:11.524995,POINT(80.482944605872 126.906906872056)"}  
-[ RECORD 19 ]---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------  
order_id | 2468486  
min_ts   | 2018-11-01 18:44:08.135136  
seg      | {"2018-11-01 18:44:08.135136,POINT(84.7732630362734 132.659516767599)","2018-11-01 18:44:20.603312,POINT(87.6352122295648 132.18647258915)","2018-11-01 18:44:19.447776,POINT(94.9817024609074 131.295661441982)"}  

9、历史轨迹的保留

设置cv生命周期,自动清理老化数据

postgres=# select pipelinedb.set_ttl('cv1', interval '1 hour' , 'min_ts');  
-[ RECORD 1 ]-----  
set_ttl | (3600,2)  

创建目标持久化表

create table cv1_persist (like cv1);  

创建时间字段索引(CV1)

postgres=# create index idx_1 on cv1 (min_ts);  
CREATE INDEX  

ETL形式,将数据从cv抽取到目标持久化表

postgres=# insert into cv1_persist select * from cv1 where min_ts <= '2018-01-01';  
INSERT 0 0  

参考

https://github.com/pipelinedb/pipelinedb

http://zeromq.org/intro:get-the-software

https://www.pipelinedb.com/

 类似资料: