RDD风格系统
很多RRD风格系统将时间序列命名为webserver01.sys.cpu.0.user
,表示webserver01
上 cpu 0
在用户空间中的时间量。
如果该服务器具有64个核心,而想要知道该服务器所有cpu的时间,则使用通配符webserver01.sys.cpu.*.user
,该通配符将读取所有 64 个文件并汇总结果。
如果有1000个 Web 服务器,每个服务器有64核,并且想要知道所有服务器的 CPU 时间,则使用通配符*.sys.cpu.*.user
,该通配符将读取所有 64000 个文件并汇总结果。或者可以设置一个过程来预聚合数据并将其写入webservers.sys.cpu.user.all
。
OpenTSDB
OpenTSDB 是通过引入“标签”的概念来处理有一点不同的事情,每个时间序列依然有一个较为通用的"metric",可以被很多唯一的时间序列共享。时间序列的唯一性是通过tagk/tagv(标签)的组合来确定的,该组合允许使用非常快速的聚合进行灵活的查询。
OpenTSDB 中的每个时间序列都必须至少具有一个标签。
对应RDD风格的设计,在OpenTSDB中实现如下:
sum:sys.cpu.user{host=webserver01, cpu=0}
sum:sys.cpu.user{host=webserver01}
sum:sys.cpu.user
如果我们需要查询webserver01
的所有cpu时间,则可以使用sum:sys.cpu.user{host=webserver01}
,为了能够快速的获取汇总数据,可以开启OpenTSDB的预聚合功能,并编写一个新的时间序列sys.cpu.user host=webserver01
即可。
OpenTSDB存储内容如下:
sys.cpu.user host=webserver01 1356998400 50
sys.cpu.user host=webserver01,cpu=0 1356998400 1
sys.cpu.user host=webserver01,cpu=1 1356998400 0
sys.cpu.user host=webserver01,cpu=2 1356998400 2
sys.cpu.user host=webserver01,cpu=3 1356998400 0
...
sys.cpu.user host=webserver01,cpu=63 1356998400 1
当预聚合完成后,我们再使用sum:sys.cpu.user{host=webserver01}
去查询webserver01
的所有cpu时间,得到的结果将不是50,而会是100,因此请谨慎使用命名架构
默认的UID为3个字节,可以通过修改配置来更改长度
tsd.storage.uid.width.metric
tsd.storage.uid.width.tagk
tsd.storage.uid.width.tagv
时间序列基数会极大地影响查询速度,如果我们有一台单个内核的主机,该主机发出一个时间序列sys.cpu.user host=webserver01,cpu=0
,并且每秒写入一次数据,持续 1 天,则将导致 24 行数据或 86,400 个数据点。但是,如果这个主机有8个 CPU 内核,那么将有 192 行和 691,200 个数据点。通过发出类似start=1d-ago&m=avg:sys.cpu.user{host=webserver01}
的查询,我们可以轻松获得所有内核的 CPU 使用率的总和或平均值。该查询将遍历所有 192 行,并将数据聚合为单个时间序列。
如果我们有 20,000 个主机,每个主机有 8 个核心,由于主机值的高基数,现在我们每天将有 384 万行和 17.28 亿个数据点,对主机webserver01
的平均核心使用情况的查询将较慢,因为它必须从 384 万行中选择 192 行。
这种模式的好处是数据具有非常深的粒度,例如,获取所有主机的所有内核的平均使用率:start=1d-ago&m=avg:sys.cpu.user
。但是,由于要筛选的行更多,因此针对该特定 Metrics 的查询将花费更长的时间。这在所有数据库中都很常见,而不仅仅是 OpenTSDB 的问题。
可以通过以下方法来处理基数高问题:
Pre-Aggregate:针对上面的sys.cpu.user
的示例,如果只是关心每个主机的使用情况,而不是每个核心的使用情况,收集器可以发送一个额外的数据点,例如sys.cpu.user.avg host=webserver01
。这样就有了一个完全独立的时间序列,每天只有 24 行,如果有 20,000个主机,只需要从48 万行中筛选。
Shift to Metric:如果只关心特定主机的Metrics ,不需要再主机之间进行汇总,可以将之前的示例变成sys.cpu.user.websvr01 cpu=0
,针对该架构的查询非常快,因为该 Metrics 每天只有 192 行,但是,要跨主机聚合,您将必须执行多个查询并在 OpenTSDB 外部聚合。(OpenTSDB将来将包括此功能)。
my.metric host=foo
和my.metric datacenter=lga
。每个时间序列数据点必须包含以下数据:
sys.cpu.user
、stock.quote
或env.probe.temp
。tagk
(键)和tagv
(值)组成的键/值对。每个数据点必须至少具有一个标签。可以以秒或毫秒的分辨率将数据写入 OpenTSDB。时间戳记必须是整数,并且不得超过 13 位数字。毫秒时间戳的格式必须为1364410924250
,其中最后三位数字表示毫秒。产生超过 13 位数字(即大于毫秒分辨率)的时间戳必须在提交前四舍五入到最多 13 位数字,否则会产生错误。
秒分辨率的时间戳存储在 2 个字节中,而毫秒分辨率的存储在 4 个字节上。如果不需要毫秒级别的分辨率,建议提交10位数字的秒分辨率时间戳可以节省存储空间,避免秒和毫秒时间戳混合使用,否则回降低查询速度。
Metrics 和 Tags命名规范:
a
到z
,A
到Z
,0
到9
,-
,_
,.
,/
或 Unicode 字母,metric和tags的长度没有限制,但尽量尝试使值保持较短。解析来自put
命令的值时不带小数点(.
),则将其视为有符号整数。整数以无符号可变长度编码存储,数据点可能只占用 1 个字节的空间或最多 8 个字节的空间。数据点的最小值为-9,223,372,036,854,775,808,最大值为 9,223,372,036,854,775,807(含)。
解析来自put
命令的值时带小数点(.
),则将其视为浮点值。当前,所有浮点值均以 4 字节单精度存储,并在 2.4 及更高版本中支持 8 字节双精度,浮点数以 IEEE 754 浮点“单一格式”存储,支持正负值。
在 OpenTSDB 中写入数据点通常在原始写入后的一个小时内是幂等的。。这意味着您可以在时间戳1356998400
处写入值42
,然后在同一时间再次写入42
,不会发生任何不良情况。
但是,如果启用压缩以减少存储消耗并在压缩该行数据之后写入相同的数据点,则在该行上查询时可能会返回异常。
如果您尝试使用相同的时间戳写入两个不同的值,则在查询期间可能会引发重复的数据点异常。这是由于对 1、2、4 或 8 个字节的整数和浮点数进行编码的差异。如果第一个值是整数,第二个值是浮点数,则始终会引发重复错误。但是,如果两个值都是浮点数,或者都是以相同长度编码的整数,那么如果未在行上进行压缩,则原始值可能会被覆盖。
在OpenTSDB 2.1,可以通过设置tsd.storage.fix_duplicates=true
来启用最后写入胜出。启用此标志后,在查询时,将返回记录的最新值,而不是引发异常。还将向日志文件中写入警告,指出已找到重复项。如果还启用了压缩,则原始压缩值将被最新值覆盖。
OpenTSDB支持三种方式写入数据:
如果
tsd.mode
设置为ro
而不是rw
,则 TSD 将不会通过 RPC 调用接受数据点。 Telnet API调用将引发异常, HTTP API调用将返回 404 错误,但是仍然可以通过 JAVA API 进行写入。
使用telnetClient 连接到TSDB并使用以下命令发出put
请求
put <metric> <timestamp> <value> <tagk1=tagv1[ tagk2=tagv2 ...tagkN=tagvN]>
例:
put sys.cpu.user 1356998400 42.5 host=webserver01 cpu=0
http://host:4242/q?start=1h-ago&m=sum:sys.cpu.system{host=webserver01,cpu=0}
OpenTSDB可以扩展到在带有常规旋转硬盘驱动器的商品服务器上每秒写入数百万个数据点。然而,那些在虚拟机上以独立模式下运行的HBase,试图在一个全新的TSD上猛击数百万个数据点的用户会失望,因为他们每秒只能写入数百个点的数据。下面是您需要做什么来扩展全新的安装或测试以及扩展现有系统。
在写入数据点之前,必须为metric、tags的每个字符串分配一个UID。分配需要花费大量的时间,因为它必须先获取一个可用的UID,写入一个UID——名称的映射和一个名称——UID的映射,然后使用UID写入数据点的rowKey。UID将被存储到TSD的缓存中,以便下一次使用相同的metric时能够快速获取。
因此,建议可能多的为metric、tagk、tagv预分配UID,可以使用Http API等方式执行预分配。
如果重新启动 TSD,它将会为每个metric和tags查找 UID,因此在填充高速缓存之前,性能会有些慢。
一个 TSD 每秒可以处理数千次写入。但是,如果您有许多来源,则最好通过运行多个 TSD 并使用负载均衡器(例如 Varnish 或 DNS 循环)来扩展写入,以进行扩展。当群集专用于 OpenTSDB 时,许多用户在其 HBase 区域服务器上放置 TSD。
在 TSD 中启用保持活动状态,并确保用于发送时间序列数据的任何应用程序保持其连接打开,而不是每次写入都打开和关闭。
OpenTSDB 2.0 引入了元数据,用于跟踪系统中的数据种类。启用跟踪后,每个写入的数据点的计数器都会递增,新的 UID 或时间序列将生成元数据。数据可以推送到搜索引擎或通过树生成代码传递。这些过程在 TSD 中需要更大的内存,并可能影响吞吐量。跟踪默认情况下处于禁用状态,因此请在启用此功能之前对其进行测试。
2 .0 还引入了一个实时发布插件,在该插件中,传入的数据点在排队存储后可以立即发送到另一个目标。默认情况下禁用此功能,因此在生产环境中部署之前,请测试您感兴趣的所有插件。
查询数据时,OpenTSDB 支持多种日期和时间格式,每个查询都需要一个“开始时间”和一个可选的“结束时间”,如果未指定结束时间,则将使用运行 TSD 的系统上的当前时间。
相对时间的格式为<amount><time unit>-ago
,其中<amount>
是时间单位数,<time unit>
是时间单位。可能的时间单位包括:
相对时间表示的是从当前时间计算过去的秒数。
OpenTSDB内部,所有数据都与 Unix(或 POSIX)样式的时间戳关联。 Unix 时间定义为自 1970 年 1 月 1 日 UTC 时间 00:00:00 以来经过的秒数,时间戳表示为正整数,例如1364410924
。使用 Unix 时间戳的查询还可以通过简单地附加三个数字来支持毫秒精度,如使用1364410924250
或1364410924.250
。10个字符或更少字符表示秒,13(或14)个字符代表毫秒,多余的必须截断或舍入。
OpenTSDB 支持人类可读的绝对日期和时间,支持的格式如下:
将Absolute Formatted Time转换为Absolute Unix Time时,OpenTSDB将使用运行 TSD 的系统配置的时区转换为 UTC。
使用Http API查询时可以通过指定如tz=Asia/Shanghai
参数将上海本地时间转换为UTC。
任何数据库系统的一项关键功能是使用某种形式的过滤来获取完整数据集的子集。目前,OpenTSDB的过滤器是在标签值上运行。在最初的 OpenTSDB 版本和 2.1 以下版本中,只有两种类型的过滤器可用,并且已隐式配置它们以进行分组。允许的两个运算符是:
例如:有一个host
标签,它的值有A
、B
、C
,则支持以下几种方式过滤:
host=A
:仅检索主机名为A的一条时间序列。host=A|B
:检索主机为A和B的二条时间序列,按照A
和B
分组。host=*
:检索每个主机的时间序列,按照A
、B
、C
分组。与其它关系型数据库的group by
概率一致,使用所需的聚合函数和过滤器将多个时间序列组合成一个时间序列的过程。默认情况下,OpenTSDB按metric
对所有内容进行分组。一般与Aggregators一起使用。
如果希望获取每个基础时间序列没有任何聚合,可以使用OpenTSDB 2.2及以上版本中的none
聚合器。
OpenTSDB2.2支持禁用分组并添加了更灵活的过滤器,例如正则表达式regexp。可以使用HTTP API /api/config/filters
查询已加载的过滤器插件及使用说明。
允许在同一标签键上使用多个过滤器,并在处理时使用&
运算符,例如如果有二个过滤器host=literal_or(web01)
和host=literal_or(web02)
,则查询将始终返回空。
如果一个标签键包含两个或多个过滤器,并且一个过滤器启用了group by
,而另一个未启用,则对于该标签键上的所有过滤器,group by
将有效。
某些类型的过滤器可能会导致查询的执行速度比其他过滤器慢,特别是regexp、通配符和不区分大小写的过滤器。在从存储中获取数据之前,将处理筛选器以创建基于uid的数据库筛选器,因此使用区分大小写的literal_or筛选器总是比regexp快,因为我们可以将字符串解析为uid并将其发送到存储系统进行筛选。相反,如果您要求使用带有pre、post或infix过滤的regex或通配符,则TSD必须使用标记键UID从存储器中检索所有行,然后对于每个唯一的行,将UID解析回字符串,然后对结果运行过滤器。此外,具有大量文本列表的过滤器集将在存储后进行处理,以避免为备份存储创建大量过滤器进行处理。此限制默认为
4096
,可以通过tsd.query.filter.expansion_limit
参数进行配置。
接收单个literal
值或|
分割的值列表,以区分大小写的方式匹配指定的值并返回时间序列。这是一个非常有效的过滤器,因为它可以将字符串解析为 UID,并将其发送到存储层进行预过滤。在 SQL 中,这类似于IN
或=
谓词。
例:
host=literal_or(web01|web02|web03)
,在 SQL 中:WHERE host IN ('web01', 'web02', 'web03')
host=literal_or(web01)
,在 SQL 中:WHERE host = 'web01'
区分大小写的literal_or
,它将返回与给定值列表不匹配的序列。高效,因为它可以通过存储进行预处理。
与literal_or
相同,但不区分大小写。请注意,这与literal
的效率不同,因为它必须对存储中的所有行进行后期处理。
不区分大小写的not_literal_or
,低效。
提供区分大小写的后缀,前缀,中缀和多中缀过滤。通配符是星号*
。可以使用多个通配符。如果仅给出星号,则过滤器有效地返回包含标签键的任何时间序列(并且是可以进行预处理的高效过滤器)。在 SQL 领域中,这类似于LIKE
谓词,具有更大的灵活性。
例:
host=wildcard(*mysite.com)
,在 SQL 中:WHERE host='%mysite.com'
host=wildcard(web*)
host=wildcard(web*mysite.com)
host=wildcard(web*mysite*)
host=wildcard(*)
与wildcard
相同,但不区分大小写。
使用POSIX兼容的正则表达式从存储中提取后进行筛选。该过滤器使用 Java 的内置正则表达式操作。根据所使用的查询方法,请小心转义特殊字符。
例:
regexp(web.*)
,在 SQL 中:WHERE host REGEXP 'web.*'
regexp(web[0-9].mysite.com)
从 2.3 及更高版本开始,如果知道给定的metric
的所有tag key
,则可以使用explicitTags
功能极大地改善查询延迟,原因是:
metric
,后端可以切换到更有效的查询,获取较小的数据子集。tags
变化的metric
,将会被过滤。explicitTags
将构建底层存储查询,该查询仅会获取给定tags
的那些行(精确匹配),这样可以使数据库跳过不相关的行并在更少的时间内回答。
explicitTags
主要解决了metric
的标签键不一致的问题。
例:
TS# | Metric | Tags | Value @ T1 |
---|---|---|---|
1 | sys.cpu.system | dc=dal host=web01 | 3 |
2 | sys.cpu.system | dc=dal host=web02 | 2 |
3 | sys.cpu.system | dc=dal host=web03 | 10 |
4 | sys.cpu.system | host=web01 | 1 |
5 | sys.cpu.system | host=web01 owner=jdoe | 4 |
6 | sys.cpu.system | dc=lax host=web01 | 8 |
7 | sys.cpu.system | dc=lax host=web02 | 4 |
案例1:http://host:4399/api/query?start=1h-ago&m=sum:explicit_tags:sys.cpu.system{host=web01}
Time Series Included | Tags | Aggregated Tags | Value @ T1 |
---|---|---|---|
4 | host=web01 | 1 |
案例2:http://host:4399/api/query?start=1h-ago&m=sum:explicit_tags:sys.cpu.system{host=*}{dc=*}
Time Series Included | Tags | Aggregated Tags | Value @ T1 |
---|---|---|---|
1, 6 | host=web01 | dc | 11 |
2, 7 | host=web02 | dc | 6 |
3 | host=web03,dc=dal | 10 |
当开发人员添加插件时,我们将在此处列出它们。
要开发插件,只需扩展net.opentsdb.query.filter.TagVFilter
类,根据Plugins文档创建 JAR 并将其放在您的 plugins 目录中。在启动时,TSD 将搜索并加载该插件。如果实现存在错误,则 TSD 将不会启动,并将记录异常。
过滤器用于按标签对结果进行分组,然后将汇总应用于每个组。OpenTSDB的聚合类似于SQL中的 GROUP BY
子句,用户可以选择一个预定义的聚合函数将多个记录合并为单个结果。但是,在 TSD 中,每个时间戳和组会汇总一组记录。
每个聚合器都有两个组件:
多个时间序列进行聚合时会出现序列未对齐及数据丢失等情况。
series | t0 | t0+10s | t0+20s | t0+30s | t0+40s | t0+50s | t0+60s |
---|---|---|---|---|---|---|---|
A | na | 5 | na | 15 | na | 5 | na |
B | 10 | na | 20 | na | 10 | na | 20 |
Interpolated A | 10 | 10 | |||||
Interpolated B | 15 | 15 | 15 | ||||
Summed Result | 10 | 20 | 30 | 25 | 20 | 20 | 20 |
可采用Interpolation(插值),默认为线性插值。
可以使用HTTP API查询/api/aggregators
以获得在 TSD 上实现的聚合器列表。
Aggregator | TSD Version | Description | Interpolation |
---|---|---|---|
avg | 1.0 | 平均数据点 | Linear Interpolation |
count | 2.2 | 集合中原始数据点的数量 | 如果丢失,则为零 |
dev | 1.0 | 计算标准偏差 | Linear Interpolation |
sum | 1.0 | 将数据点加在一起 | Linear Interpolation |
zimsum | 2.0 | 将数据点加在一起 | 如果丢失,则为零 |
mimmin | 2.0 | 选择最小的数据点 | 如果丢失则为Long.MaxValue(整数点)或 Double.MaxValue(浮点值) |
mimmax | 2.0 | 选择最大的数据点 | 如果丢失则为 Long.MinValue(整数点)或 Double.MinValue(浮点值) |
min | 1.0 | 选择最小的数据点 | Linear Interpolation |
max | 1.0 | 选择最大的数据点 | Linear Interpolation |
none | 2.3 | 跳过所有时间序列的分组。 | 如果丢失,则为零 |
first | 2.3 | 返回集合中的第一个数据点。仅适用于降采样,不适用于汇总。 | Indeterminate |
last | 2.3 | 返回集合中的最后一个数据点。仅适用于降采样,不适用于汇总。 | Indeterminate |
ep50r3 | 2.2 | 使用 R-3 方法计算估计的 50%* | Linear Interpolation |
ep50r7 | 2.2 | 用 R-7 方法计算估计的 50%* | Linear Interpolation |
ep75r3 | 2.2 | 用 R-3 方法计算估计的第 75 个百分位数* | Linear Interpolation |
ep75r7 | 2.2 | 用 R-7 方法计算估计的第 75 个百分位数* | Linear Interpolation |
ep90r3 | 2.2 | 使用 R-3 方法计算估计的 90%* | Linear Interpolation |
ep90r7 | 2.2 | 使用 R-7 方法计算估计的 90%* | Linear Interpolation |
ep95r3 | 2.2 | 使用 R-3 方法计算估计的 95%* | Linear Interpolation |
ep95r7 | 2.2 | 使用 R-7 方法计算估计的 95%* | Linear Interpolation |
ep99r3 | 2.2 | 用 R-3 方法计算估计的 99%* | Linear Interpolation |
ep99r7 | 2.2 | 用 R-7 方法计算估计的 99%* | Linear Interpolation |
ep999r3 | 2.2 | 使用 R-3 方法计算估计的第 999 个百分点* | Linear Interpolation |
ep999r7 | 2.2 | 使用 R-7 方法计算估计的第 999 个百分点* | Linear Interpolation |
p50 | 2.2 | 计算第 50 个百分位数 | Linear Interpolation |
p75 | 2.2 | 计算第 75 个百分位数 | Linear Interpolation |
p90 | 2.2 | 计算第 90 个百分位数 | Linear Interpolation |
p95 | 2.2 | 计算第 95 个百分位数 | Linear Interpolation |
p99 | 2.2 | 计算第 99 个百分位数 | Linear Interpolation |
p999 | 2.2 | 计算第 999 个百分位数 | Linear Interpolation |
采样器至少需要两个组件:
<Size><Units>
格式指定,例如1h
表示 1 小时或30m
表示 30 分钟。从 2.3 *开始,现在可以使用all间隔将时间范围内的所有结果下采样为一个值。例如: 0all-sum
将对从查询开始到结束的所有值求和。请注意,仍然需要一个数字值,但它可以是零或任何其他值。指定30s-sum
的降采样器,它将创建 30 秒的存储桶,并对每个存储桶中的所有数据点求和,示例如下:
Time Series | t0 | t0+10s | t0+20s | t0+30s | t0+40s | t0+50s | t0+60 |
---|---|---|---|---|---|---|---|
A | 5 | 5 | 10 | 15 | 20 | 5 | 1 |
A sum 降采样 | 5 + 5 + 10 = 20 | 15 + 20 + 5 = 40 | 1 = 1 | ||||
B | 10 | 5 | 20 | 15 | 10 | 0 | 5 |
B sum 降采样 | 10 + 5 + 20 = 35 | 15 + 10 + 0 = 25 | 5 = 5 | ||||
sum 汇总结果 | 55 | 65 | 6 |
降采样通常用于对齐时间戳,以避免在进行分组时插值。使用降采样执行分组聚合时,如果所有序列在预期的间隔内都缺少值,则不会发出任何消息,例如:如果某个序列每分钟从t0
到t0+6m
写入数据,但是由于某种原因,源无法在t0+3m
写入数据,则当用户期望序列为 6 个值时,但仅只有5个值,对于 2.2 及更高版本,支持填充策略,t0+3m
可以选择以什么样的预定义值来填充。
每当bucket(采样桶)为空时,可以选择以下填充策略:
以下示例,每10s一条数据,使用NaN填充策略填充缺失值:
Time Series | t0 | t0+10s | t0+20s | t0+30s | t0+40s | t0+50s | t0+60s |
---|---|---|---|---|---|---|---|
A | 15 | 5 | |||||
B | 10 | 20 | 20 | ||||
A sum 降采样 | NaN | NaN | NaN | 15 | NaN | 5 | NaN |
B sum 下采样 | 10 | NaN | 20 | NaN | NaN | NaN | 20 |
sum 汇总结果 | 10 | NaN | 20 | 15 | NaN | 5 | 20 |
如果我们请求的输出没有填充策略,则不会返回t0+10s
或t0+40s
的值或时间戳。另外,将对序列B的t0+30s
和t0+50s
处的值进行线性插值,以填充要与系列A相加的值。
metric
的标签数量和标签键相同。explicitTags
查询。官方文档:http://opentsdb.net/docs/build/html/user_guide/rollups.html
汇总:在OpenTSDB中的定义为将单个时间序列某段时间内的数据做聚合(如SUM、COUNT、AVG等),有助于解决查看较长时间 Span 的问题,汇总本质上是调用降采样的计算结果并存储到系统中。
预聚合:将具有相同度量标准(metric)的不同时间序列做聚合,有助于解决度量标准的高基数(即给定度量标准的唯一时间序列数)问题,
汇总和预聚合的目的是解决较长时间的Span问题和metric的高基数问题,提高查询效率,降低服务器的负载。OpenTSDB 本身并不计算和存储汇总或预聚合的数据,不过官方提供了以下几种解决方案:
1. 在OpenTSDB的配置文件opentsdb.confg
中加入如下配置
#开启汇总
tsd.rollups.enable=true
#汇总配置文件位置
tsd.rollups.config=./rollup_config.json
2. 创建上述配置文件指定的汇总配置文件rollup_config.json
(创建目录由上面配置指定)
{
"aggregationIds": {
"sum": 0,
"count": 1,
"min": 2,
"max": 3,
"avg": 4
},
"intervals": [
{
"table": "tsdb", //原始数据表
"preAggregationTable": "tsdb", //预聚合汇总数据表
"interval": "1s", //忽略
"rowSpan": "1h", //忽略
"defaultInterval": true
},
{
"table": "tsdb-rollup-5m", //5分钟汇总表
"preAggregationTable": "tsdb-rollup-5m", //5分钟预聚合汇总表
"interval": "5m", //5分钟汇总一次
"rowSpan": "1h", //行宽1h
"defaultInterval": false
},
{
"table": "tsdb-rollup-1h", //1小时汇总表
"preAggregationTable": "tsdb-rollup-1h", //1小时预聚合汇总表
"interval": "1h", //1小时汇总一次
"rowSpan": "1d", //行宽1d
"defaultInterval": false
}
]
}
说明:
defaultInterval
和Rollup Interval
,"defaultInterval": true
定义的是默认的原始数据表,即tsdb
(或tsd.storage.hbase.data_table
配置),interval
(默认1s)和rowSpan
(默认1小时)将被忽略;Rollup Interval
指defaultInterval":false
或未设置的汇总表。字段说明
Name | Data Type | Required | Description | Example |
---|---|---|---|---|
table | String | Required | 非预聚合数据的基础表或汇总表。对于默认表,此表应为tsdb 或已写入原始数据的表。对于汇总数据,它必须是与原始数据是不同的表。 | tsdb-rollup-1h |
preAggregationTable | String | Required | 预聚合 和汇总(可选)数据的表应写入其中。该表可能与table 值相同。 | tsdb-rollup-preagg-1h |
interval | String | Required | 数据点之间的预期间隔为<interval><units> 格式。例如。如果汇总每小时计算一次,则间隔应为1h 。如果每 10 分钟计算一次,请将其设置为10m 。对于默认表,此值将被忽略。 | 1h |
rowSpan | String | Required | 存储中每一行的宽度。该值必须大于interval ,并定义每行中可容纳的interval 的数量,rowSpan 定义的 interval 数量应该在14bits内(2^13)。例如。如果间隔为1h 且rowSpan 为1d ,则每行有 24 个值。 | 1d |
defaultInterval | Boolean | Optional | 原始的非汇总数据的配置间隔是否为默认间隔。 | true |
当使用query查询时,OpenTSDB会根据聚合函数和时间间隔进行路由,如果时间间隔是interval
的倍数,且具有相关的汇总函数数据,会路由到汇总表查询数据。
3. 创建HBase汇总表
[root@opentsdb63 hbase-2.3.1]# bin/hbase shell
hbase(main):002:0> create 'tsdb-rollup-5m','t'
hbase(main):002:0> create 'tsdb-rollup-1h','t'
4. 汇总测试:
1)使用api/rollup
接口添加汇总数据
curl -XPOST http://localhost:4399/api/rollup?details -d '
[
{
"metric": "client.current_conns",
"timestamp": 1546344000,
"value": 133,
"tags": {
"host": "host2",
"datacenter": "dc1",
"port": "80"
},
"interval": "1h",
"aggregator": "SUM"
},
{
"metric": "client.current_conns",
"timestamp": 1546344000,
"value": 162,
"tags": {
"host": "host1",
"datacenter": "dc1",
"port": "80"
},
"interval": "1h",
"aggregator": "SUM"
},
{
"metric": "client.current_conns",
"timestamp": 1546344000,
"value": 4,
"tags": {
"host": "host2",
"datacenter": "dc1",
"port": "80"
},
"interval": "1h",
"aggregator": "COUNT"
},
{
"metric": "client.current_conns",
"timestamp": 1546344000,
"value": 4,
"tags": {
"host": "host1",
"datacenter": "dc1",
"port": "80"
},
"interval": "1h",
"aggregator": "COUNT"
}
]'
2) 使用/api/query
接口查询汇总数据
curl localhost:4399/api/query -d '
{
"start":1546344000,
"end":1546347600,
"msResolution":false,
"queries":[{
"aggregator":"avg",
"metric":"client.current_conns",
"rate":false,
"downsample":"1h-avg",
"rollupUsage": "ROLLUP_NOFALLBACK"
}]
}'
downsample
聚合函数使用avg
时aggregator
必须使用avg
,否则会引发以下异常:
{
"error": {
"code": 400,
"message": "Attempt to add a different aggrregate cell =KeyValue(key=[0, 0, 0, 0, 0, 31, 55, 92, 42, -83, -128, 0, 0, 0, 0, 0, 0, 13, 0, 0, 0, 0, 0, 21, -109, 0, 0, 0, 0, 0, 0, 14, 0, 0, 0, 0, 0, 21, -110, 0, 0, 0, 0, 0, 0, 15, 0, 0, 0, 0, 0, 21, -108], family=\"t\", qualifier=[0, 0, -63], value=[0, -123], timestamp=1617893045321), expected aggregator \"raw:\"",
"details": "Attempt to add a different aggrregate cell =KeyValue(key=[0, 0, 0, 0, 0, 31, 55, 92, 42, -83, -128, 0, 0, 0, 0, 0, 0, 13, 0, 0, 0, 0, 0, 21, -109, 0, 0, 0, 0, 0, 0, 14, 0, 0, 0, 0, 0, 21, -110, 0, 0, 0, 0, 0, 0, 15, 0, 0, 0, 0, 0, 21, -108], family=\"t\", qualifier=[0, 0, -63], value=[0, -123], timestamp=1617893045321), expected aggregator \"raw:\"",
"trace": "net.opentsdb.tsd.BadRequestException: Attempt to add a different aggrregate cell =KeyValue(key=[0, 0, 0, 0, 0, 31, 55, 92, 42, -83, -128, 0, 0, 0, 0, 0, 0, 13, 0, 0, 0, 0, 0, 21, -109, 0, 0, 0, 0, 0, 0, 14, 0, 0, 0, 0, 0, 21, -110, 0, 0, 0, 0, 0, 0, 15, 0, 0, 0, 0, 0, 21, -108], family=\"t\", qualifier=[0, 0, -63], value=[0, -123], timestamp=1617893045321), expected aggregator \"raw:\"\n\tat net.opentsdb.tsd.QueryRpc$1ErrorCB.call(QueryRpc.java:220) [tsdb-2.4.0.jar:]\n\tat net.opentsdb.tsd.QueryRpc$1ErrorCB.call(QueryRpc.java:180) [tsdb-2.4.0.jar:]\n\tat com.stumbleupon.async.Deferred.doCall(Deferred.java:1278) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.runCallbacks(Deferred.java:1257) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.access$300(Deferred.java:430) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred$Continue.call(Deferred.java:1366) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.doCall(Deferred.java:1278) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.runCallbacks(Deferred.java:1257) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.callback(Deferred.java:1005) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.DeferredGroup.done(DeferredGroup.java:169) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.DeferredGroup.recordCompletion(DeferredGroup.java:158) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.DeferredGroup.access$200(DeferredGroup.java:36) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.DeferredGroup$1NotifyOrdered.call(DeferredGroup.java:97) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.doCall(Deferred.java:1278) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.runCallbacks(Deferred.java:1257) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.callback(Deferred.java:1005) [async-1.4.0.jar:na]\n\tat net.opentsdb.core.SaltScanner.validateAndTriggerCallback(SaltScanner.java:949) [tsdb-2.4.0.jar:]\n\tat net.opentsdb.core.SaltScanner.access$2300(SaltScanner.java:68) [tsdb-2.4.0.jar:]\n\tat net.opentsdb.core.SaltScanner$ScannerCB.close(SaltScanner.java:910) [tsdb-2.4.0.jar:]\n\tat net.opentsdb.core.SaltScanner$ScannerCB.call(SaltScanner.java:539) [tsdb-2.4.0.jar:]\n\tat net.opentsdb.core.SaltScanner$ScannerCB.call(SaltScanner.java:461) [tsdb-2.4.0.jar:]\n\tat com.stumbleupon.async.Deferred.doCall(Deferred.java:1278) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.addCallbacks(Deferred.java:688) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.addCallback(Deferred.java:724) [async-1.4.0.jar:na]\n\tat net.opentsdb.core.SaltScanner$ScannerCB.scan(SaltScanner.java:525) [tsdb-2.4.0.jar:]\n\tat net.opentsdb.core.SaltScanner$ScannerCB.call(SaltScanner.java:716) [tsdb-2.4.0.jar:]\n\tat net.opentsdb.core.SaltScanner$ScannerCB.call(SaltScanner.java:461) [tsdb-2.4.0.jar:]\n\tat com.stumbleupon.async.Deferred.doCall(Deferred.java:1278) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.runCallbacks(Deferred.java:1257) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.handleContinuation(Deferred.java:1313) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.doCall(Deferred.java:1284) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.runCallbacks(Deferred.java:1257) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.access$300(Deferred.java:430) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred$Continue.call(Deferred.java:1366) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.doCall(Deferred.java:1278) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.runCallbacks(Deferred.java:1257) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.access$300(Deferred.java:430) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred$Continue.call(Deferred.java:1366) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.doCall(Deferred.java:1278) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.runCallbacks(Deferred.java:1257) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.callback(Deferred.java:1005) [async-1.4.0.jar:na]\n\tat org.hbase.async.HBaseRpc.callback(HBaseRpc.java:720) [asynchbase-1.8.2.jar:na]\n\tat org.hbase.async.RegionClient.decode(RegionClient.java:1575) [asynchbase-1.8.2.jar:na]\n\tat org.hbase.async.RegionClient.decode(RegionClient.java:88) [asynchbase-1.8.2.jar:na]\n\tat org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:500) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:435) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) [netty-3.10.6.Final.jar:na]\n\tat org.hbase.async.RegionClient.handleUpstream(RegionClient.java:1230) [asynchbase-1.8.2.jar:na]\n\tat org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.channel.SimpleChannelHandler.messageReceived(SimpleChannelHandler.java:142) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:88) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.handler.timeout.IdleStateAwareChannelHandler.handleUpstream(IdleStateAwareChannelHandler.java:36) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.handler.timeout.IdleStateHandler.messageReceived(IdleStateHandler.java:294) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) [netty-3.10.6.Final.jar:na]\n\tat org.hbase.async.HBaseClient$RegionClientPipeline.sendUpstream(HBaseClient.java:3857) [asynchbase-1.8.2.jar:na]\n\tat org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) [netty-3.10.6.Final.jar:na]\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_211]\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_211]\n\tat java.lang.Thread.run(Thread.java:748) [na:1.8.0_211]\nCaused by: net.opentsdb.core.IllegalDataException: Attempt to add a different aggrregate cell =KeyValue(key=[0, 0, 0, 0, 0, 31, 55, 92, 42, -83, -128, 0, 0, 0, 0, 0, 0, 13, 0, 0, 0, 0, 0, 21, -109, 0, 0, 0, 0, 0, 0, 14, 0, 0, 0, 0, 0, 21, -110, 0, 0, 0, 0, 0, 0, 15, 0, 0, 0, 0, 0, 21, -108], family=\"t\", qualifier=[0, 0, -63], value=[0, -123], timestamp=1617893045321), expected aggregator \"raw:\"\n\tat net.opentsdb.rollup.RollupSeq.setRow(RollupSeq.java:159) ~[tsdb-2.4.0.jar:]\n\tat net.opentsdb.rollup.RollupSpan.addRow(RollupSpan.java:70) ~[tsdb-2.4.0.jar:]\n\tat net.opentsdb.core.SaltScanner.mergeDataPoints(SaltScanner.java:429) [tsdb-2.4.0.jar:]\n\tat net.opentsdb.core.SaltScanner.mergeAndReturnResults(SaltScanner.java:307) [tsdb-2.4.0.jar:]\n\tat net.opentsdb.core.SaltScanner.validateAndTriggerCallback(SaltScanner.java:947) [tsdb-2.4.0.jar:]\n\t... 55 common frames omitted\n"
}
}
官方issue:https://github.com/OpenTSDB/opentsdb/issues/1501
#开启预聚合
tsd.rollups.tag_raw=true
#预聚合数据的标记tag key
tsd.rollups.agg_tag_key=_aggregate
#用于标记是原始数据的tag value
tsd.rollups.raw_agg_tag_value=RAW