当前位置: 首页 > 工具软件 > OpenTSDB > 使用案例 >

OpenTSDB

班泽语
2023-12-01

Writing Data

Nameing Schema

RDD风格系统

很多RRD风格系统将时间序列命名为webserver01.sys.cpu.0.user,表示webserver01上 cpu 0在用户空间中的时间量。

如果该服务器具有64个核心,而想要知道该服务器所有cpu的时间,则使用通配符webserver01.sys.cpu.*.user,该通配符将读取所有 64 个文件并汇总结果。

如果有1000个 Web 服务器,每个服务器有64核,并且想要知道所有服务器的 CPU 时间,则使用通配符*.sys.cpu.*.user,该通配符将读取所有 64000 个文件并汇总结果。或者可以设置一个过程来预聚合数据并将其写入webservers.sys.cpu.user.all

OpenTSDB

OpenTSDB 是通过引入“标签”的概念来处理有一点不同的事情,每个时间序列依然有一个较为通用的"metric",可以被很多唯一的时间序列共享。时间序列的唯一性是通过tagk/tagv(标签)的组合来确定的,该组合允许使用非常快速的聚合进行灵活的查询。

OpenTSDB 中的每个时间序列都必须至少具有一个标签。

对应RDD风格的设计,在OpenTSDB中实现如下:

  • sum:sys.cpu.user{host=webserver01, cpu=0}
  • sum:sys.cpu.user{host=webserver01}
  • sum:sys.cpu.user

Aggregations

如果我们需要查询webserver01的所有cpu时间,则可以使用sum:sys.cpu.user{host=webserver01},为了能够快速的获取汇总数据,可以开启OpenTSDB的预聚合功能,并编写一个新的时间序列sys.cpu.user host=webserver01即可。

OpenTSDB存储内容如下:

sys.cpu.user host=webserver01        1356998400  50
sys.cpu.user host=webserver01,cpu=0  1356998400  1
sys.cpu.user host=webserver01,cpu=1  1356998400  0
sys.cpu.user host=webserver01,cpu=2  1356998400  2
sys.cpu.user host=webserver01,cpu=3  1356998400  0
...
sys.cpu.user host=webserver01,cpu=63 1356998400  1

当预聚合完成后,我们再使用sum:sys.cpu.user{host=webserver01}去查询webserver01的所有cpu时间,得到的结果将不是50,而会是100,因此请谨慎使用命名架构

Time Series Cardinality

Limited Unique IDs (UIDs)

默认的UID为3个字节,可以通过修改配置来更改长度

  • tsd.storage.uid.width.metric
  • tsd.storage.uid.width.tagk
  • tsd.storage.uid.width.tagv

Query Speed

时间序列基数会极大地影响查询速度,如果我们有一台单个内核的主机,该主机发出一个时间序列sys.cpu.user host=webserver01,cpu=0,并且每秒写入一次数据,持续 1 天,则将导致 24 行数据或 86,400 个数据点。但是,如果这个主机有8个 CPU 内核,那么将有 192 行和 691,200 个数据点。通过发出类似start=1d-ago&m=avg:sys.cpu.user{host=webserver01}的查询,我们可以轻松获得所有内核的 CPU 使用率的总和或平均值。该查询将遍历所有 192 行,并将数据聚合为单个时间序列。

如果我们有 20,000 个主机,每个主机有 8 个核心,由于主机值的高基数,现在我们每天将有 384 万行和 17.28 亿个数据点,对主机webserver01的平均核心使用情况的查询将较慢,因为它必须从 384 万行中选择 192 行。

这种模式的好处是数据具有非常深的粒度,例如,获取所有主机的所有内核的平均使用率:start=1d-ago&m=avg:sys.cpu.user。但是,由于要筛选的行更多,因此针对该特定 Metrics 的查询将花费更长的时间。这在所有数据库中都很常见,而不仅仅是 OpenTSDB 的问题。

可以通过以下方法来处理基数高问题:

Pre-Aggregate:针对上面的sys.cpu.user的示例,如果只是关心每个主机的使用情况,而不是每个核心的使用情况,收集器可以发送一个额外的数据点,例如sys.cpu.user.avg host=webserver01。这样就有了一个完全独立的时间序列,每天只有 24 行,如果有 20,000个主机,只需要从48 万行中筛选。

Shift to Metric:如果只关心特定主机的Metrics ,不需要再主机之间进行汇总,可以将之前的示例变成sys.cpu.user.websvr01 cpu=0,针对该架构的查询非常快,因为该 Metrics 每天只有 192 行,但是,要跨主机聚合,您将必须执行多个查询并在 OpenTSDB 外部聚合。(OpenTSDB将来将包括此功能)。

Naming Conclusion

  • metric、tagk、tagv始终使用相同的大小写规则,避免重复。
  • 每个metric使用相同数量和类型的标签。例如:不要存储my.metric host=foomy.metric datacenter=lga
  • 考虑一下您将要执行的最常见查询,并针对这些查询优化方案。
  • 考虑一下在查询时您可能想如何下钻。
  • 不要使用太多标签,而应将其保持在一个相当小的数目,通常最多 4 或 5 个标签(默认情况下,OpenTSDB 最多支持 8 个标签)。如果绝对需要,您可以随时增加可用于集群的标签数量。

Data Specification

每个时间序列数据点必须包含以下数据:

  • metric:时间序列通用名称,例如:sys.cpu.userstock.quoteenv.probe.temp
  • timestamp:纪元时间戳,以秒或毫秒为单位,定义为自 1970 年 1 月 1 日 UTC 时间 00:00:00 起经过的秒数。目前仅支持正时间戳。
  • value:在时间序列的给定时间戳记下存储的数值。这可以是整数或浮点值
  • tag(s) :由tagk(键)和tagv(值)组成的键/值对。每个数据点必须至少具有一个标签。

Timestamps

可以以秒或毫秒的分辨率将数据写入 OpenTSDB。时间戳记必须是整数,并且不得超过 13 位数字。毫秒时间戳的格式必须为1364410924250,其中最后三位数字表示毫秒。产生超过 13 位数字(即大于毫秒分辨率)的时间戳必须在提交前四舍五入到最多 13 位数字,否则会产生错误。

秒分辨率的时间戳存储在 2 个字节中,而毫秒分辨率的存储在 4 个字节上。如果不需要毫秒级别的分辨率,建议提交10位数字的秒分辨率时间戳可以节省存储空间,避免秒和毫秒时间戳混合使用,否则回降低查询速度。

Metrics and Tags

Metrics 和 Tags命名规范:

  • 字符串区分大小写
  • 不允许有空格
  • 仅允许以下字符:azAZ09-_./或 Unicode 字母,metric和tags的长度没有限制,但尽量尝试使值保持较短。

value

解析来自put命令的值时不带小数点(.),则将其视为有符号整数。整数以无符号可变长度编码存储,数据点可能只占用 1 个字节的空间或最多 8 个字节的空间。数据点的最小值为-9,223,372,036,854,775,808,最大值为 9,223,372,036,854,775,807(含)。

解析来自put命令的值时带小数点(.),则将其视为浮点值。当前,所有浮点值均以 4 字节单精度存储,并在 2.4 及更高版本中支持 8 字节双精度,浮点数以 IEEE 754 浮点“单一格式”存储,支持正负值。

Duplicate Data Points

在 OpenTSDB 中写入数据点通常在原始写入后的一个小时内是幂等的。。这意味着您可以在时间戳1356998400处写入值42,然后在同一时间再次写入42,不会发生任何不良情况。

但是,如果启用压缩以减少存储消耗并在压缩该行数据之后写入相同的数据点,则在该行上查询时可能会返回异常。

如果您尝试使用相同的时间戳写入两个不同的值,则在查询期间可能会引发重复的数据点异常。这是由于对 1、2、4 或 8 个字节的整数和浮点数进行编码的差异。如果第一个值是整数,第二个值是浮点数,则始终会引发重复错误。但是,如果两个值都是浮点数,或者都是以相同长度编码的整数,那么如果未在行上进行压缩,则原始值可能会被覆盖。

在OpenTSDB 2.1,可以通过设置tsd.storage.fix_duplicates=true来启用最后写入胜出。启用此标志后,在查询时,将返回记录的最新值,而不是引发异常。还将向日志文件中写入警告,指出已找到重复项。如果还启用了压缩,则原始压缩值将被最新值覆盖。

Input Methods

OpenTSDB支持三种方式写入数据:

  • Telnet API(不建议使用)
  • Http API
  • 文件批量导入
  • Java库(谨慎使用)

如果tsd.mode设置为ro而不是rw,则 TSD 将不会通过 RPC 调用接受数据点。 Telnet API调用将引发异常, HTTP API调用将返回 404 错误,但是仍然可以通过 JAVA API 进行写入。

Telnet API

使用telnetClient 连接到TSDB并使用以下命令发出put请求

put <metric> <timestamp> <value> <tagk1=tagv1[ tagk2=tagv2 ...tagkN=tagvN]>

例:

put sys.cpu.user 1356998400 42.5 host=webserver01 cpu=0

Http API

http://host:4242/q?start=1h-ago&m=sum:sys.cpu.system{host=webserver01,cpu=0}

Batch Import

Write Performance

OpenTSDB可以扩展到在带有常规旋转硬盘驱动器的商品服务器上每秒写入数百万个数据点。然而,那些在虚拟机上以独立模式下运行的HBase,试图在一个全新的TSD上猛击数百万个数据点的用户会失望,因为他们每秒只能写入数百个点的数据。下面是您需要做什么来扩展全新的安装或测试以及扩展现有系统。

UID Assignment

在写入数据点之前,必须为metric、tags的每个字符串分配一个UID。分配需要花费大量的时间,因为它必须先获取一个可用的UID,写入一个UID——名称的映射和一个名称——UID的映射,然后使用UID写入数据点的rowKey。UID将被存储到TSD的缓存中,以便下一次使用相同的metric时能够快速获取。

因此,建议可能多的为metric、tagk、tagv预分配UID,可以使用Http API等方式执行预分配。

如果重新启动 TSD,它将会为每个metric和tags查找 UID,因此在填充高速缓存之前,性能会有些慢。

随机 MetricsUID 分配

Salting

Appends

预分割 HBase 区域

Distributed HBase

Multiple TSDs

一个 TSD 每秒可以处理数千次写入。但是,如果您有许多来源,则最好通过运行多个 TSD 并使用负载均衡器(例如 Varnish 或 DNS 循环)来扩展写入,以进行扩展。当群集专用于 OpenTSDB 时,许多用户在其 HBase 区域服务器上放置 TSD。

Persistent Connections

在 TSD 中启用保持活动状态,并确保用于发送时间序列数据的任何应用程序保持其连接打开,而不是每次写入都打开和关闭。

禁用元数据和实时发布

OpenTSDB 2.0 引入了元数据,用于跟踪系统中的数据种类。启用跟踪后,每个写入的数据点的计数器都会递增,新的 UID 或时间序列将生成元数据。数据可以推送到搜索引擎或通过树生成代码传递。这些过程在 TSD 中需要更大的内存,并可能影响吞吐量。跟踪默认情况下处于禁用状态,因此请在启用此功能之前对其进行测试。

2 .0 还引入了一个实时发布插件,在该插件中,传入的数据点在排队存储后可以立即发送到另一个目标。默认情况下禁用此功能,因此在生产环境中部署之前,请测试您感兴趣的所有插件。

Querying or Reading Data

Dates and Times

查询数据时,OpenTSDB 支持多种日期和时间格式,每个查询都需要一个“开始时间”和一个可选的“结束时间”,如果未指定结束时间,则将使用运行 TSD 的系统上的当前时间。

relative

相对时间的格式为<amount><time unit>-ago,其中<amount>是时间单位数,<time unit>是时间单位。可能的时间单位包括:

  • ms-毫秒
  • s-秒
  • m-分钟
  • h-小时
  • d-天(24 小时)
  • w-周(7 天)
  • n-月(30 天)
  • y-年(365 天)

相对时间表示的是从当前时间计算过去的秒数。

Absolute Unix Time

OpenTSDB内部,所有数据都与 Unix(或 POSIX)样式的时间戳关联。 Unix 时间定义为自 1970 年 1 月 1 日 UTC 时间 00:00:00 以来经过的秒数,时间戳表示为正整数,例如1364410924。使用 Unix 时间戳的查询还可以通过简单地附加三个数字来支持毫秒精度,如使用13644109242501364410924.250。10个字符或更少字符表示秒,13(或14)个字符代表毫秒,多余的必须截断或舍入。

Absolute Formatted Time

OpenTSDB 支持人类可读的绝对日期和时间,支持的格式如下:

  • yyyy/MM/dd-HH:mm:ss
  • yyyy/MM/dd HH:mm:ss
  • yyyy/MM/dd-HH:mm
  • yyyy/MM/dd HH:mm
  • yyyy/MM/dd

Time Zones

将Absolute Formatted Time转换为Absolute Unix Time时,OpenTSDB将使用运行 TSD 的系统配置的时区转换为 UTC。

使用Http API查询时可以通过指定如tz=Asia/Shanghai参数将上海本地时间转换为UTC。

Query Filters

任何数据库系统的一项关键功能是使用某种形式的过滤来获取完整数据集的子集。目前,OpenTSDB的过滤器是在标签值上运行。在最初的 OpenTSDB 版本和 2.1 以下版本中,只有两种类型的过滤器可用,并且已隐式配置它们以进行分组。允许的两个运算符是:

  • *:通配符,将为查询到的每个唯一标签值(tagv)返回单独的结果。
  • |:管道(literal_or),将为指定的每个确切标签值(tagv)返回单独的结果。

例如:有一个host标签,它的值有ABC,则支持以下几种方式过滤:

  • host=A:仅检索主机名为A的一条时间序列。
  • host=A|B:检索主机为A和B的二条时间序列,按照AB分组。
  • host=*:检索每个主机的时间序列,按照ABC分组。

Grouping

与其它关系型数据库的group by概率一致,使用所需的聚合函数和过滤器将多个时间序列组合成一个时间序列的过程。默认情况下,OpenTSDB按metric对所有内容进行分组。一般与Aggregators一起使用

如果希望获取每个基础时间序列没有任何聚合,可以使用OpenTSDB 2.2及以上版本中的none聚合器。

内置过滤器

OpenTSDB2.2支持禁用分组并添加了更灵活的过滤器,例如正则表达式regexp。可以使用HTTP API /api/config/filters查询已加载的过滤器插件及使用说明。

允许在同一标签键上使用多个过滤器,并在处理时使用&运算符,例如如果有二个过滤器host=literal_or(web01)host=literal_or(web02),则查询将始终返回空。

如果一个标签键包含两个或多个过滤器,并且一个过滤器启用了group by,而另一个未启用,则对于该标签键上的所有过滤器,group by将有效。

某些类型的过滤器可能会导致查询的执行速度比其他过滤器慢,特别是regexp、通配符和不区分大小写的过滤器。在从存储中获取数据之前,将处理筛选器以创建基于uid的数据库筛选器,因此使用区分大小写的literal_or筛选器总是比regexp快,因为我们可以将字符串解析为uid并将其发送到存储系统进行筛选。相反,如果您要求使用带有pre、post或infix过滤的regex或通配符,则TSD必须使用标记键UID从存储器中检索所有行,然后对于每个唯一的行,将UID解析回字符串,然后对结果运行过滤器。此外,具有大量文本列表的过滤器集将在存储后进行处理,以避免为备份存储创建大量过滤器进行处理。此限制默认为4096,可以通过tsd.query.filter.expansion_limit参数进行配置。

literal_or

接收单个literal值或|分割的值列表,以区分大小写的方式匹配指定的值并返回时间序列。这是一个非常有效的过滤器,因为它可以将字符串解析为 UID,并将其发送到存储层进行预过滤。在 SQL 中,这类似于IN=谓词。

例:

  • host=literal_or(web01|web02|web03),在 SQL 中:WHERE host IN ('web01', 'web02', 'web03')
  • host=literal_or(web01),在 SQL 中:WHERE host = 'web01'

not_literal_or

区分大小写的literal_or,它将返回与给定值列表不匹配的序列。高效,因为它可以通过存储进行预处理。

ilteral_or

literal_or相同,但不区分大小写。请注意,这与literal的效率不同,因为它必须对存储中的所有行进行后期处理。

not_iliteral_or

不区分大小写的not_literal_or,低效。

wildcard

提供区分大小写的后缀,前缀,中缀和多中缀过滤。通配符是星号*。可以使用多个通配符。如果仅给出星号,则过滤器有效地返回包含标签键的任何时间序列(并且是可以进行预处理的高效过滤器)。在 SQL 领域中,这类似于LIKE谓词,具有更大的灵活性。

例:

  • host=wildcard(*mysite.com),在 SQL 中:WHERE host='%mysite.com'
  • host=wildcard(web*)
  • host=wildcard(web*mysite.com)
  • host=wildcard(web*mysite*)
  • host=wildcard(*)

iwildcard

wildcard相同,但不区分大小写。

regexp

使用POSIX兼容的正则表达式从存储中提取后进行筛选。该过滤器使用 Java 的内置正则表达式操作。根据所使用的查询方法,请小心转义特殊字符。

例:

  • regexp(web.*),在 SQL 中:WHERE host REGEXP 'web.*'
  • regexp(web[0-9].mysite.com)

Explicit Tags

从 2.3 及更高版本开始,如果知道给定的metric的所有tag key,则可以使用explicitTags功能极大地改善查询延迟,原因是:

  • 对于高基数的metric,后端可以切换到更有效的查询,获取较小的数据子集。
  • 对于tags变化的metric,将会被过滤。

explicitTags将构建底层存储查询,该查询仅会获取给定tags的那些行(精确匹配),这样可以使数据库跳过不相关的行并在更少的时间内回答。

explicitTags主要解决了metric的标签键不一致的问题。

例:

TS#MetricTagsValue @ T1
1sys.cpu.systemdc=dal host=web013
2sys.cpu.systemdc=dal host=web022
3sys.cpu.systemdc=dal host=web0310
4sys.cpu.systemhost=web011
5sys.cpu.systemhost=web01 owner=jdoe4
6sys.cpu.systemdc=lax host=web018
7sys.cpu.systemdc=lax host=web024

案例1:http://host:4399/api/query?start=1h-ago&m=sum:explicit_tags:sys.cpu.system{host=web01}

Time Series IncludedTagsAggregated TagsValue @ T1
4host=web011

案例2:http://host:4399/api/query?start=1h-ago&m=sum:explicit_tags:sys.cpu.system{host=*}{dc=*}

Time Series IncludedTagsAggregated TagsValue @ T1
1, 6host=web01dc11
2, 7host=web02dc6
3host=web03,dc=dal10

Plugins

当开发人员添加插件时,我们将在此处列出它们。

要开发插件,只需扩展net.opentsdb.query.filter.TagVFilter类,根据Plugins文档创建 JAR 并将其放在您的 plugins 目录中。在启动时,TSD 将搜索并加载该插件。如果实现存在错误,则 TSD 将不会启动,并将记录异常。

Aggregation

过滤器用于按标签对结果进行分组,然后将汇总应用于每个组。OpenTSDB的聚合类似于SQL中的 GROUP BY子句,用户可以选择一个预定义的聚合函数将多个记录合并为单个结果。但是,在 TSD 中,每个时间戳和组会汇总一组记录。

每个聚合器都有两个组件:

  • 聚合功能:应用的 math 计算,例如对所有值求和,计算平均值或选择最高值。
  • 插值:一种处理丢失值的方法,例如当时间序列 A 在 T1 处有值,而时间序列 B 没有值时。

多个时间序列进行聚合时会出现序列未对齐及数据丢失等情况。

seriest0t0+10st0+20st0+30st0+40st0+50st0+60s
Ana5na15na5na
B10na20na10na20
Interpolated A1010
Interpolated B151515
Summed Result10203025202020

可采用Interpolation(插值),默认为线性插值。

Available Aggregators

可以使用HTTP API查询/api/aggregators以获得在 TSD 上实现的聚合器列表。

AggregatorTSD VersionDescriptionInterpolation
avg1.0平均数据点Linear Interpolation
count2.2集合中原始数据点的数量如果丢失,则为零
dev1.0计算标准偏差Linear Interpolation
sum1.0将数据点加在一起Linear Interpolation
zimsum2.0将数据点加在一起如果丢失,则为零
mimmin2.0选择最小的数据点如果丢失则为Long.MaxValue(整数点)或 Double.MaxValue(浮点值)
mimmax2.0选择最大的数据点如果丢失则为 Long.MinValue(整数点)或 Double.MinValue(浮点值)
min1.0选择最小的数据点Linear Interpolation
max1.0选择最大的数据点Linear Interpolation
none2.3跳过所有时间序列的分组。如果丢失,则为零
first2.3返回集合中的第一个数据点。仅适用于降采样,不适用于汇总。Indeterminate
last2.3返回集合中的最后一个数据点。仅适用于降采样,不适用于汇总。Indeterminate
ep50r32.2使用 R-3 方法计算估计的 50%*Linear Interpolation
ep50r72.2用 R-7 方法计算估计的 50%*Linear Interpolation
ep75r32.2用 R-3 方法计算估计的第 75 个百分位数*Linear Interpolation
ep75r72.2用 R-7 方法计算估计的第 75 个百分位数*Linear Interpolation
ep90r32.2使用 R-3 方法计算估计的 90%*Linear Interpolation
ep90r72.2使用 R-7 方法计算估计的 90%*Linear Interpolation
ep95r32.2使用 R-3 方法计算估计的 95%*Linear Interpolation
ep95r72.2使用 R-7 方法计算估计的 95%*Linear Interpolation
ep99r32.2用 R-3 方法计算估计的 99%*Linear Interpolation
ep99r72.2用 R-7 方法计算估计的 99%*Linear Interpolation
ep999r32.2使用 R-3 方法计算估计的第 999 个百分点*Linear Interpolation
ep999r72.2使用 R-7 方法计算估计的第 999 个百分点*Linear Interpolation
p502.2计算第 50 个百分位数Linear Interpolation
p752.2计算第 75 个百分位数Linear Interpolation
p902.2计算第 90 个百分位数Linear Interpolation
p952.2计算第 95 个百分位数Linear Interpolation
p992.2计算第 99 个百分位数Linear Interpolation
p9992.2计算第 999 个百分位数Linear Interpolation

Downsampling(降采样)

采样器至少需要两个组件:

  • 间隔:聚合值的时间范围(或* bucket )。例如,我们可以在 1 分钟或 1 小时甚至一天内汇总多个值。间隔以<Size><Units>格式指定,例如1h表示 1 小时或30m表示 30 分钟。从 2.3 *开始,现在可以使用all间隔将时间范围内的所有结果下采样为一个值。例如: 0all-sum将对从查询开始到结束的所有值求和。请注意,仍然需要一个数字值,但它可以是零或任何其他值。
  • 聚合函数:一种 math 函数,用于确定如何合并间隔中的值。 Aggregation中的聚合功能用于该函数。

指定30s-sum的降采样器,它将创建 30 秒的存储桶,并对每个存储桶中的所有数据点求和,示例如下:

Time Seriest0t0+10st0+20st0+30st0+40st0+50st0+60
A5510152051
A sum降采样5 + 5 + 10 = 2015 + 20 + 5 = 401 = 1
B10520151005
B sum降采样10 + 5 + 20 = 3515 + 10 + 0 = 255 = 5
sum汇总结果55656

Fill Policies(填充策略)

降采样通常用于对齐时间戳,以避免在进行分组时插值。使用降采样执行分组聚合时,如果所有序列在预期的间隔内都缺少值,则不会发出任何消息,例如:如果某个序列每分钟从t0t0+6m写入数据,但是由于某种原因,源无法在t0+3m写入数据,则当用户期望序列为 6 个值时,但仅只有5个值,对于 2.2 及更高版本,支持填充策略,t0+3m可以选择以什么样的预定义值来填充。

每当bucket(采样桶)为空时,可以选择以下填充策略:

  • None(none):默认行为,在序列化过程中不会发出缺失值,并且在聚合序列时会执行线性插值(或以其他方式指定的插值)。
  • NaN(nan):当序列中的所有值均缺失时,在序列化输出中发出NaN。当值丢失时跳过聚合中的序列,而不是将整个分组依据转换为 NaN。
  • Null(null):与 NaN 相同的行为,除了在序列化期间它发出null而不是NaN之外。
  • Zero(zero):当缺少时间戳时,将其替换为零。零值将合并到汇总结果中。

以下示例,每10s一条数据,使用NaN填充策略填充缺失值:

Time Seriest0t0+10st0+20st0+30st0+40st0+50st0+60s
A155
B102020
A sum降采样NaNNaNNaN15NaN5NaN
B sum下采样10NaN20NaNNaNNaN20
sum汇总结果10NaN2015NaN520

如果我们请求的输出没有填充策略,则不会返回t0+10st0+40s的值或时间戳。另外,将对序列B的t0+30st0+50s处的值进行线性插值,以填充要与系列A相加的值。

Query Performance

  • 保证相同metric的标签数量和标签键相同。
  • 尽量在查询中启用explicitTags查询。
  • 使用downsampling减少由TSD序列化发送给用户的数据量。
  • 使用多个TSD负载均衡
  • 优化HBase

Order of Operations

  1. Filtering
  2. Grouping
  3. Downsampling
  4. Interpolation
  5. Aggregation
  6. Rate Conversion
  7. Functions
  8. Expressions

Rollup And Pre-Aggregates(汇总与预聚合)

汇总与预聚合简介

官方文档:http://opentsdb.net/docs/build/html/user_guide/rollups.html

汇总:在OpenTSDB中的定义为将单个时间序列某段时间内的数据做聚合(如SUM、COUNT、AVG等),有助于解决查看较长时间 Span 的问题,汇总本质上是调用降采样的计算结果并存储到系统中。
预聚合:将具有相同度量标准(metric)的不同时间序列做聚合,有助于解决度量标准的高基数(即给定度量标准的唯一时间序列数)问题

汇总和预聚合的目的是解决较长时间的Span问题和metric的高基数问题,提高查询效率,降低服务器的负载。OpenTSDB 本身并不计算和存储汇总或预聚合的数据,不过官方提供了以下几种解决方案:

  1. 批处理。对于数据延迟适当推迟执行时间,随着数据量的增大,批处理的运行时间会加长影响HBase的性能,建议从复制的系统或备用存储中读取聚合。
  2. 内存中使用队列,配置一个时间窗口,每隔一段时间计算一次写入OpenTSDB。OpenTSDB上游如Nginx负载均衡路由问题、RAM内存大、TSD进程终止数据丢失、原始数据整体写入吞吐量问题、JVM GC问题、若采用磁盘缓存IO问题等,不建议使用。
  3. 流处理(Storm、Spark、Flink等)。类似队列,但流处理框架解决了路由和内存存储问题。

OpenTSDB汇总配置

1. 在OpenTSDB的配置文件opentsdb.confg中加入如下配置

#开启汇总
tsd.rollups.enable=true
#汇总配置文件位置
tsd.rollups.config=./rollup_config.json

2. 创建上述配置文件指定的汇总配置文件rollup_config.json(创建目录由上面配置指定)

{
    "aggregationIds": {
        "sum": 0,
        "count": 1,
        "min": 2,
        "max": 3,
        "avg": 4
    },
    "intervals": [
        {
            "table": "tsdb",	//原始数据表
            "preAggregationTable": "tsdb",	//预聚合汇总数据表
            "interval": "1s",	//忽略
            "rowSpan": "1h",	//忽略
            "defaultInterval": true
        },
        {
            "table": "tsdb-rollup-5m",	//5分钟汇总表
            "preAggregationTable": "tsdb-rollup-5m",	//5分钟预聚合汇总表
            "interval": "5m",	//5分钟汇总一次
            "rowSpan": "1h",	//行宽1h
            "defaultInterval": false
        },
        {
            "table": "tsdb-rollup-1h",	//1小时汇总表
            "preAggregationTable": "tsdb-rollup-1h",	//1小时预聚合汇总表
            "interval": "1h",	//1小时汇总一次
            "rowSpan": "1d",	//行宽1d
            "defaultInterval": false
        }
    ]
}

说明:

  • aggregationIds:配置的是聚合函数的名称与id的映射关系,通过在每种类型的汇总数据之前添加数字id来减少存储量,类似与uid的作用。id必须是 0 到 127 之间的整数。
  • intervals:时间间隔对象的集合,每个时间间隔对象都定义了表路由,用于汇总和预聚合数据应写入和查询的位置。间隔有两种类型:defaultIntervalRollup Interval"defaultInterval": true定义的是默认的原始数据表,即tsdb(或tsd.storage.hbase.data_table配置),interval(默认1s)和rowSpan(默认1小时)将被忽略;Rollup IntervaldefaultInterval":false或未设置的汇总表。

字段说明

NameData TypeRequiredDescriptionExample
tableStringRequired非预聚合数据的基础表或汇总表。对于默认表,此表应为tsdb或已写入原始数据的表。对于汇总数据,它必须是与原始数据是不同的表。tsdb-rollup-1h
preAggregationTableStringRequired预聚合和汇总(可选)数据的表应写入其中。该表可能与table值相同。tsdb-rollup-preagg-1h
intervalStringRequired数据点之间的预期间隔为<interval><units>格式。例如。如果汇总每小时计算一次,则间隔应为1h。如果每 10 分钟计算一次,请将其设置为10m。对于默认表,此值将被忽略。1h
rowSpanStringRequired存储中每一行的宽度。该值必须大于interval,并定义每行中可容纳的interval的数量,rowSpan 定义的 interval数量应该在14bits内(2^13)。例如。如果间隔为1hrowSpan1d,则每行有 24 个值。1d
defaultIntervalBooleanOptional原始的非汇总数据的配置间隔是否为默认间隔。true

当使用query查询时,OpenTSDB会根据聚合函数和时间间隔进行路由,如果时间间隔是interval的倍数,且具有相关的汇总函数数据,会路由到汇总表查询数据。

3. 创建HBase汇总表

[root@opentsdb63 hbase-2.3.1]# bin/hbase shell
hbase(main):002:0> create 'tsdb-rollup-5m','t'
hbase(main):002:0> create 'tsdb-rollup-1h','t'

4. 汇总测试:
1)使用api/rollup接口添加汇总数据

 curl -XPOST http://localhost:4399/api/rollup?details -d '
[
  {
    "metric": "client.current_conns",
    "timestamp": 1546344000,
    "value": 133,
    "tags": {
      "host": "host2",
      "datacenter": "dc1",
      "port": "80"
    },
    "interval": "1h",
    "aggregator": "SUM"
  },
  {
    "metric": "client.current_conns",
    "timestamp": 1546344000,
    "value": 162,
    "tags": {
      "host": "host1",
      "datacenter": "dc1",
      "port": "80"
    },
    "interval": "1h",
    "aggregator": "SUM"
  },
  {
    "metric": "client.current_conns",
    "timestamp": 1546344000,
    "value": 4,
    "tags": {
      "host": "host2",
      "datacenter": "dc1",
      "port": "80"
    },
    "interval": "1h",
    "aggregator": "COUNT"
  },
  {
    "metric": "client.current_conns",
    "timestamp": 1546344000,
    "value": 4,
    "tags": {
      "host": "host1",
      "datacenter": "dc1",
      "port": "80"
    },
    "interval": "1h",
    "aggregator": "COUNT"
  }
]'

2) 使用/api/query接口查询汇总数据

 curl localhost:4399/api/query -d '
{
  "start":1546344000,
   "end":1546347600,
   "msResolution":false,
   "queries":[{
     "aggregator":"avg",
     "metric":"client.current_conns",
     "rate":false,
     "downsample":"1h-avg",
     "rollupUsage": "ROLLUP_NOFALLBACK"
  }]
 }'

downsample聚合函数使用avgaggregator必须使用avg,否则会引发以下异常:

{
  "error": {
    "code": 400,
    "message": "Attempt to add a different aggrregate cell =KeyValue(key=[0, 0, 0, 0, 0, 31, 55, 92, 42, -83, -128, 0, 0, 0, 0, 0, 0, 13, 0, 0, 0, 0, 0, 21, -109, 0, 0, 0, 0, 0, 0, 14, 0, 0, 0, 0, 0, 21, -110, 0, 0, 0, 0, 0, 0, 15, 0, 0, 0, 0, 0, 21, -108], family=\"t\", qualifier=[0, 0, -63], value=[0, -123], timestamp=1617893045321), expected aggregator \"raw:\"",
    "details": "Attempt to add a different aggrregate cell =KeyValue(key=[0, 0, 0, 0, 0, 31, 55, 92, 42, -83, -128, 0, 0, 0, 0, 0, 0, 13, 0, 0, 0, 0, 0, 21, -109, 0, 0, 0, 0, 0, 0, 14, 0, 0, 0, 0, 0, 21, -110, 0, 0, 0, 0, 0, 0, 15, 0, 0, 0, 0, 0, 21, -108], family=\"t\", qualifier=[0, 0, -63], value=[0, -123], timestamp=1617893045321), expected aggregator \"raw:\"",
    "trace": "net.opentsdb.tsd.BadRequestException: Attempt to add a different aggrregate cell =KeyValue(key=[0, 0, 0, 0, 0, 31, 55, 92, 42, -83, -128, 0, 0, 0, 0, 0, 0, 13, 0, 0, 0, 0, 0, 21, -109, 0, 0, 0, 0, 0, 0, 14, 0, 0, 0, 0, 0, 21, -110, 0, 0, 0, 0, 0, 0, 15, 0, 0, 0, 0, 0, 21, -108], family=\"t\", qualifier=[0, 0, -63], value=[0, -123], timestamp=1617893045321), expected aggregator \"raw:\"\n\tat net.opentsdb.tsd.QueryRpc$1ErrorCB.call(QueryRpc.java:220) [tsdb-2.4.0.jar:]\n\tat net.opentsdb.tsd.QueryRpc$1ErrorCB.call(QueryRpc.java:180) [tsdb-2.4.0.jar:]\n\tat com.stumbleupon.async.Deferred.doCall(Deferred.java:1278) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.runCallbacks(Deferred.java:1257) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.access$300(Deferred.java:430) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred$Continue.call(Deferred.java:1366) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.doCall(Deferred.java:1278) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.runCallbacks(Deferred.java:1257) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.callback(Deferred.java:1005) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.DeferredGroup.done(DeferredGroup.java:169) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.DeferredGroup.recordCompletion(DeferredGroup.java:158) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.DeferredGroup.access$200(DeferredGroup.java:36) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.DeferredGroup$1NotifyOrdered.call(DeferredGroup.java:97) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.doCall(Deferred.java:1278) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.runCallbacks(Deferred.java:1257) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.callback(Deferred.java:1005) [async-1.4.0.jar:na]\n\tat net.opentsdb.core.SaltScanner.validateAndTriggerCallback(SaltScanner.java:949) [tsdb-2.4.0.jar:]\n\tat net.opentsdb.core.SaltScanner.access$2300(SaltScanner.java:68) [tsdb-2.4.0.jar:]\n\tat net.opentsdb.core.SaltScanner$ScannerCB.close(SaltScanner.java:910) [tsdb-2.4.0.jar:]\n\tat net.opentsdb.core.SaltScanner$ScannerCB.call(SaltScanner.java:539) [tsdb-2.4.0.jar:]\n\tat net.opentsdb.core.SaltScanner$ScannerCB.call(SaltScanner.java:461) [tsdb-2.4.0.jar:]\n\tat com.stumbleupon.async.Deferred.doCall(Deferred.java:1278) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.addCallbacks(Deferred.java:688) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.addCallback(Deferred.java:724) [async-1.4.0.jar:na]\n\tat net.opentsdb.core.SaltScanner$ScannerCB.scan(SaltScanner.java:525) [tsdb-2.4.0.jar:]\n\tat net.opentsdb.core.SaltScanner$ScannerCB.call(SaltScanner.java:716) [tsdb-2.4.0.jar:]\n\tat net.opentsdb.core.SaltScanner$ScannerCB.call(SaltScanner.java:461) [tsdb-2.4.0.jar:]\n\tat com.stumbleupon.async.Deferred.doCall(Deferred.java:1278) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.runCallbacks(Deferred.java:1257) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.handleContinuation(Deferred.java:1313) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.doCall(Deferred.java:1284) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.runCallbacks(Deferred.java:1257) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.access$300(Deferred.java:430) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred$Continue.call(Deferred.java:1366) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.doCall(Deferred.java:1278) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.runCallbacks(Deferred.java:1257) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.access$300(Deferred.java:430) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred$Continue.call(Deferred.java:1366) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.doCall(Deferred.java:1278) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.runCallbacks(Deferred.java:1257) [async-1.4.0.jar:na]\n\tat com.stumbleupon.async.Deferred.callback(Deferred.java:1005) [async-1.4.0.jar:na]\n\tat org.hbase.async.HBaseRpc.callback(HBaseRpc.java:720) [asynchbase-1.8.2.jar:na]\n\tat org.hbase.async.RegionClient.decode(RegionClient.java:1575) [asynchbase-1.8.2.jar:na]\n\tat org.hbase.async.RegionClient.decode(RegionClient.java:88) [asynchbase-1.8.2.jar:na]\n\tat org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:500) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:435) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) [netty-3.10.6.Final.jar:na]\n\tat org.hbase.async.RegionClient.handleUpstream(RegionClient.java:1230) [asynchbase-1.8.2.jar:na]\n\tat org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.channel.SimpleChannelHandler.messageReceived(SimpleChannelHandler.java:142) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:88) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.handler.timeout.IdleStateAwareChannelHandler.handleUpstream(IdleStateAwareChannelHandler.java:36) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.handler.timeout.IdleStateHandler.messageReceived(IdleStateHandler.java:294) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) [netty-3.10.6.Final.jar:na]\n\tat org.hbase.async.HBaseClient$RegionClientPipeline.sendUpstream(HBaseClient.java:3857) [asynchbase-1.8.2.jar:na]\n\tat org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) [netty-3.10.6.Final.jar:na]\n\tat org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) [netty-3.10.6.Final.jar:na]\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_211]\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_211]\n\tat java.lang.Thread.run(Thread.java:748) [na:1.8.0_211]\nCaused by: net.opentsdb.core.IllegalDataException: Attempt to add a different aggrregate cell =KeyValue(key=[0, 0, 0, 0, 0, 31, 55, 92, 42, -83, -128, 0, 0, 0, 0, 0, 0, 13, 0, 0, 0, 0, 0, 21, -109, 0, 0, 0, 0, 0, 0, 14, 0, 0, 0, 0, 0, 21, -110, 0, 0, 0, 0, 0, 0, 15, 0, 0, 0, 0, 0, 21, -108], family=\"t\", qualifier=[0, 0, -63], value=[0, -123], timestamp=1617893045321), expected aggregator \"raw:\"\n\tat net.opentsdb.rollup.RollupSeq.setRow(RollupSeq.java:159) ~[tsdb-2.4.0.jar:]\n\tat net.opentsdb.rollup.RollupSpan.addRow(RollupSpan.java:70) ~[tsdb-2.4.0.jar:]\n\tat net.opentsdb.core.SaltScanner.mergeDataPoints(SaltScanner.java:429) [tsdb-2.4.0.jar:]\n\tat net.opentsdb.core.SaltScanner.mergeAndReturnResults(SaltScanner.java:307) [tsdb-2.4.0.jar:]\n\tat net.opentsdb.core.SaltScanner.validateAndTriggerCallback(SaltScanner.java:947) [tsdb-2.4.0.jar:]\n\t... 55 common frames omitted\n"
  }
}

官方issue:https://github.com/OpenTSDB/opentsdb/issues/1501

OpenTSDB预聚合配置

#开启预聚合
tsd.rollups.tag_raw=true
#预聚合数据的标记tag key
tsd.rollups.agg_tag_key=_aggregate
#用于标记是原始数据的tag value
tsd.rollups.raw_agg_tag_value=RAW

相关阅读

相关文章

相关问答