通过一些黑盒实验,验证一下Kapacitor对于乱序数据的处理能力。
参考 博客 的设置,但是根据最新版本的Kapacitor进行一些改造。首先启动Kapacitor和influxdb。influxdb和Kapacitor的安装参见 博客 。
软件 | 版本 |
---|---|
influxdb | 1.4.2 |
Kapacitor | 1.5.2 |
首先在influxdb中创建数据库test:
# 启动客户端
influx
# 客户端命令
create database test
use test
# 显示年月日时间。
precision rfc3339
采用http请求产生数据:
import os
import time
influx = "http://localhost:8086/write?db=test"
# 检测code_match,为0则异常,报警
code_matches = [1, 1, 1, 1, 0, 1, 1, 1, 1, 0]
# 顺序时间
t = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# 乱序时间
# t = [0, 1, 2, 6, 5, 4, 3, 7, 8, 9]
base_time = 1554575034104007000
for i, code_match in enumerate(code_matches):
os.system('curl -i -XPOST %s --data-binary "ka,app=cmdb code_match=%d %d"'
% (influx, code_match, base_time + 1 * t[i] * 1e9))
# 下面两行不指定时间戳,采用机器时间
# os.system('curl -i -XPOST %s --data-binary "ka,app=cmdb code_match=%d"'
# % (influx, id))
print("time=%d" % (base_time + 5 * t[i] * 1e9))
# 采用机器时间的话则取消注释,指定了数据时间则sleep没有意义
# time.sleep(1)
无window的tick脚本:
dbrp "test"."autogen"
stream
|from()
.measurement('ka')
|alert()
.crit(lambda: int("code_match") == 0)
.log('/Users/XXX/test_alerts.log')
有window的tick脚本,window跨度为3s,每1s检测和处理一次:
dbrp "test"."autogen"
stream
|from()
.measurement('ka')
|window()
.period(3s)
.every(1s)
|alert()
.crit(lambda: int("code_match") == 0)
.log('/Users/XXX/test_alert_window.log')
启动任务:
# 清除环境,没有指定过任务则不必须
kapacitor disable test_alert
kapacitor delete tasks test_alert
kapacitor disable test_alert_window
kapacitor delete tasks test_alert_window
kapacitor define test_alert -tick test_alert.tick
kapacitor enable test_alert
kapacitor define test_alert_window -tick test_alert_window.tick
kapacitor enable test_alert_window
最简单的场景,先熟悉一下alert的工作方式。在本场景下,只要window中有异常点,alert就会报警,如果上一个window是异常的,alert也会报警。(如果要全部点异常才报警,可以加.all()
语句)。
脚本如下(仅列出关键部分):
import os
import time
influx = "http://localhost:8086/write?db=test"
i1 = [1, 1, 1, 1, 0, 1, 1, 1, 1, 0]
t = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# t = [0, 1, 2, 6, 5, 4, 3, 7, 8, 9]
# base_time = 1554575034104007000
for i, code_match in enumerate(i1):
# os.system('curl -i -XPOST %s --data-binary "ka,app=cmdb code_match=%d %d"'
# % (influx, id, base_time + 1 * t[i] * 1e9))
os.system('curl -i -XPOST %s --data-binary "ka,app=cmdb code_match=%d"'
% (influx, code_match))
# print("time=%d" % (base_time + 5 * t[i] * 1e9))
time.sleep(1)
输出日志关键部分如下。为了清晰明了,所有时间戳均写为 Scene+0*{interval}s
, 如S1+5*1s
。
{"previousLevel":"OK","value":[
[S1+3*1s,code_match=1],
[S1+4*1s,code_match=0],
]},
{"previousLevel":"CRITICAL","value":[
[S1+4*1s,code_match=0],
[S1+5*1s,code_match=1],
]},
{"previousLevel":"CRITICAL","value":[
[S1+5*1s,code_match=1],
[S1+6*1s,code_match=1],
]},
# 本来应该输出下面的记录,但是并没有!
#{"previousLevel":"OK","value":[
# [S1+8,code_match=1],
# [S1+9,code_match=0],
#]},
S1+4
,S1+9
是异常点。窗口长度为3s内,因此是2个点。S2比S1,数据中手动指定了时间。数据时间比真实时间要迟很多(例如1天),以保证这些点是上一个场景之后到来的。
脚本如下:
every
参数。import os
import time
influx = "http://localhost:8086/write?db=test"
i1 = [1, 1, 1, 1, 0, 1, 1, 1, 1, 0]
t = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# t = [0, 1, 2, 6, 5, 4, 3, 7, 8, 9]
one_day_nano = 86400 * 1e9
data_interval = 5
# base_time比真实时间要晚
base_time = 1554575034104007000
for i, code_match in enumerate(i1):
os.system('curl -i -XPOST %s --data-binary "ka,app=cmdb code_match=%d %d"'
% (influx, code_match, base_time + data_interval * t[i] * 1e9))
# os.system('curl -i -XPOST %s --data-binary "ka,app=cmdb code_match=%d"'
# % (influx, id))
# print("time=%d" % (base_time + 5 * t[i] * 1e9))
# time.sleep(1)
相比于S1,增加了4条日志:
# S1的最后测试例现在才触发
{"previousLevel":"OK","value":[
[S1+8*1s,code_match=1],
[S1+9*1s,code_match=0],
]},
# 因为上一个window是异常,因此输出。又因为数据时间之间超出了3s,因此只有一个点。
{"previousLevel":"CRITICAL","value":[
[S2+0*5s,code_match=1],
]},
# 数据时间间隔为5s>3s,因此只记录1个点,与真实时间无关
{"previousLevel":"OK","value":[
[S2+4*5s,code_match=0],
]},
{"previousLevel":"CRITICAL","value":[
[S2+5*5s,code_match=1],
]},
S3相比S2,数据时间继续推迟1天,异常点的顺序变了。
脚本如下:
import os
import time
influx = "http://localhost:8086/write?db=test"
i1 = [1, 1, 1, 1, 0, 1, 1, 1, 1, 0]
# t = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
t = [0, 1, 2, 6, 5, 4, 3, 7, 8, 9]
one_day_nano = 86400 * 1e9
data_interval = 1
# base_time比真实时间要晚
base_time = 1554575034104007000+one_day_nano
for i, code_match in enumerate(i1):
os.system('curl -i -XPOST %s --data-binary "ka,app=cmdb code_match=%d %d"'
% (influx, code_match, base_time + data_interval * t[i] * 1e9))
# os.system('curl -i -XPOST %s --data-binary "ka,app=cmdb code_match=%d"'
# % (influx, id))
# print("time=%d" % (base_time + 5 * t[i] * 1e9))
# time.sleep(1)
相比于S1,增加了5条日志,为了便于解释,我们给每一个报警编号:
# S2结尾时候的异常引发两条alert,不赘述
# alert 0
{"previousLevel":"OK","value":[
[S2+9*5s,code_match=0],
]},
# alert 1
{"previousLevel":"CRITICAL","value":[
[S3+0*1s,code_match=1],
]},
# 关键
# alert 2
{"previousLevel":"OK","value":[
[S3+6*1s,code_match=1],
[S3+5*1s,code_match=0],
[S3+4*1s,code_match=1],
[S3+3*1s,code_match=1],
]},
# alert 3
{"previousLevel":"CRITICAL","value":[
[S3+6*1s,code_match=1],
[S3+5*1s,code_match=0],
[S3+4*1s,code_match=1],
[S3+3*1s,code_match=1],
[S3+7*1s,code_match=1],
]},
# alert 4
{"previousLevel":"CRITICAL","value":[
[S3+6*1s,code_match=1],
[S3+5*1s,code_match=0],
[S3+4*1s,code_match=1],
[S3+3*1s,code_match=1],
[S3+7*1s,code_match=1],
[S3+8*1s,code_match=1],
]},
# alert 5,等到下一次,S4才会触发。注意到,此时window恢复正常,说明S3+5*1s退出窗口
{"previousLevel":"OK","value":[
[S3+8*1s,code_match=1],
[S3+9*1s,code_match=0],
]},
数据时间间隔是1s,正常情况是写入2个点。但是为什么出现了这种情况呢?参考S1和S2可知:
这样就得到了一个不太圆满的解释:
S1:Kapacitor对于窗口的维护是lazy模式,虽然第 t t t时刻出现了异常,但如果没有下一个数据出现, t t t时刻的异常不会输出到日志;
S2结论:Kapacitor对于流数据时间戳的处理与真实时间无关,完全按照数据时间戳;
Kapacitor对于乱序数据并不特别处理。
想要理解这种bug情况,可以去看源码,但是似乎并不必要。