pipelinedb团队加入Confluent,已经不再更新,停留在了1.0.0版本。这里介绍一下可以替换pipelineDB的TimescaleDB,TimescaleDB为时序数据库,时序数据库有以下几个特点:
举几个场景:
具体使用方法,也是在postgresql中当插件使用的,具体安装方法可以参考:
https://docs.timescale.com/latest/getting-started/installation
TimescaleDB使用有多个方面,这里只介绍Continuous Aggregates特性,其他特性不做介绍。
实例:
--创建表一张基础表:
CREATE TABLE conditions (
time TIMESTAMPTZ NOT NULL,
device INTEGER NOT NULL,
temperature FLOAT NOT NULL,
PRIMARY KEY(time, device)
);
SELECT create_hypertable('conditions', 'time');
--用timescaledb.continuous view选项创建视图。视图中使用time_bucket函数将温度汇总到按小时为间隔的时间段中。
CREATE VIEW conditions_summary_hourly
WITH (timescaledb.continuous) AS
SELECT device,
time_bucket(INTERVAL '1 hour', time) AS bucket,
AVG(temperature),
MAX(temperature),
MIN(temperature)
FROM conditions
GROUP BY device, bucket;
--可以在一个时序基础表中建立多个持续聚合的视图,如下,按天聚合的视图
CREATE VIEW conditions_summary_daily
WITH (timescaledb.continuous) AS
SELECT device,
time_bucket(INTERVAL '1 day', time) AS bucket,
AVG(temperature),
MAX(temperature),
MIN(temperature)
FROM conditions
GROUP BY device, bucket;
--持续聚合视图支持大多数聚合函数的并行计算,如SUM,AVG等,但是order by和distinct 不能使用并行计算,另外filter子句也不支持并行计算。
--如下,查询改视图,可以得到第一季度的device为5的最大,最小,以及平均温度。
SELECT * FROM conditions_summary_daily
WHERE device = 5
AND bucket >= '2018-01-01' AND bucket < '2018-04-01';
--当然也可以做更复杂的一些查询
SELECT * FROM conditions_summary_daily
WHERE max - min > 1800
AND bucket >= '2018-01-01' AND bucket < '2018-04-01'
ORDER BY bucket DESC, device_id DESC LIMIT 20;
相关参数用法:
--timescaledb.refresh_interval参数控制视图的刷新时间,间隔越短,后台进程越频繁,当然进程是需要消耗资源的。
--默认情况下,查询该聚合视图的数据中,包含了已经聚合的数据以及基础表中未聚合的数据,如果你想要的结果只是聚合后的数据,不需要基础表中最新的数据,那么可以设置timescaledb.materialized_only改参数为true
ALTER VIEW conditions_summary_hourly SET (
timescaledb.materialized_only = true
);
-- 可以查询timescaledb_information.continuous_aggregates获取所有的聚合视图,如果要查询相关聚合处理进程的处理状态,可以查询 timescaledb_information.continuous_aggregate_stats视图。
SELECT view_name, materialization_hypertable
FROM timescaledb_information.continuous_aggregates;
view_name | materialization_hypertable
---------------------------+---------------------------------------------------
conditions_summary_hourly | _timescaledb_internal._materialized_hypertable_30
conditions_summary_daily | _timescaledb_internal._materialized_hypertable_31
(2 rows)
--timescaledb.refresh_lag参数控制延迟聚合的时间,如下,conditions_summary_hourly视图的bucket_width为1小时,如果设置timescaledb.refresh_lag为1小时,那么就是bucket_width+timescaledb.refresh_lag=2,也就是雾化时间比当前时间晚了两个小时,也就是聚合的是2小时之前的数据。
ALTER VIEW conditions_summary_hourly SET (
timescaledb.refresh_lag = '1 hour'
);
--越低的refresh_lag值,表示聚合的数据和基础数据的时间更接近,但是可能会导致写放大,导致插入性能变差。一般情况下,该参数不需要修改。
--timescaledb.max_interval_per_job参数决定一个job聚合的最大量,当一个job处理的数据后,还有剩下要处理的数据时,会自己启动一个新的job进行处理。
--timescaledb.ignore_invalidation_older_than参数控制修改(插入,更新和删除)如何触发连续聚合的更新。如果对基础表进行了修改,则它将使聚合中已计算的部分无效,并且必须更新聚合。默认情况下,所有数据的改变都会触发聚合的更新,如果设置了改参数,则改时间段之前的数据更改将不会触发聚合更新。
--一个常用的实例,删除基础表中30天以外的数据,但是保留持续聚合的数据在视图中。
ALTER VIEW device_readings SET (
timescaledb.ignore_invalidation_older_than = '30 days'
);
SELECT drop_chunks(INTERVAL '30 days', 'device_readings')
修改参数以及删除视图:
--修改视图参数
ALTER VIEW device_summary SET (timescaledb.refresh_interval = '10 min');
--删除视图
DROP VIEW device_summary CASCADE;
在连续聚合启用的情况下删除数据:
从上面得知,删除基础表的数据可以使用drop_chunks函数,cascade_to_materializations参数可以控制是否在聚合的视图中保留在基础表中删除的数据,如果为true,则聚合视图中的数据也将删除,如果为false,则只删除基础表中的数据,保留聚合视图中的历史聚合数据。另外需要注意的是drop_chunks函数中的older_than参数应该长于timescaledb.ignore_invalidation_older_than,因为基础数据备删除了,无法处理删除区域的数据。
相关数保留策略可参考add_drop_chunks_policy函数。
阿里云RDS for postgresql中的timescaledb如下报错:
postgres=> CREATE VIEW conditions_summary_hourly
postgres-> WITH (timescaledb.continuous) AS
postgres-> SELECT device,
postgres-> time_bucket(INTERVAL '1 hour', time) AS bucket,
postgres-> AVG(temperature),
postgres-> MAX(temperature),
postgres-> MIN(temperature)
postgres-> FROM conditions
postgres-> GROUP BY device, bucket;
ERROR: functionality not supported under the current license "ApacheOnly", license
HINT: Upgrade to a Timescale-licensed binary to access this free community feature
许可说明如下:
https://www.timescale.com/legal/licenses
参考:
https://docs.timescale.com/latest/using-timescaledb/continuous-aggregates