typescript的封装
es有官方的js api, 但是官方的body构造比较复杂, github上有第三方的bodybuilder组件专门用来构造DSL的查询部分
比如:
/*
-> POST http://localhost:9200/*scanstatistics* /_count
{
"query": {
"bool": {
"must": [
{
"match": {
"systemID": "987654321098"
}
},
{
"range": {
"timeStamp": {
"gte": "now/M"
}
}
}
]
}
}
}
<- 200
{
"count": 30,
"_shards": {
"total": 15,
"successful": 15,
"skipped": 0,
"failed": 0
}
}
*/
getScanCount2(systemID: string, dateRange: any, indices: any = '*scanstatistics*'): Observable<SearchResponse<{}>> {
const body = bodybuilder()
.addQuery('match', 'systemID', systemID)
.addQuery('range', 'timeStamp', dateRange)
.build();
return fromPromise(<Promise<SearchResponse<{}>>>
this._client.count({ index: indices, body: body, method: 'POST' })
);
}
说明:
-
es的js api可以调试构造的消息和返回的消息详细信息, 具体是在构造es 的client时指定log: 'trace'
-
可以用ping来判断es的service的状态是否正常工作, 可以指定requestTimeout为Infinity
-
es的js api有search方法, 用来从es中查询过滤数据, 具体可以指定es的index、http方法、请求的body、filterPath等。filterPath用来指定返回的结果的元素节点。
-
body中可以指定返回的数量size、排序sort、分类aggregation、以及基本的查询query等
-
分类aggregation中可以指定以什么分类, 以及分类中的值如何统计, 比如是平均avg、求和sum等。普通类用'terms'; 时间分类用'date_histogram',并可以指定时间间隔interval、时间格式format
-
es的js api默认的接口都是返回promise, 可以通过rxjs转为Observable
import { Observable } from 'rxjs/rx';
import { fromPromise } from 'rxjs/observable/fromPromise';
import { Client, SearchResponse } from 'elasticsearch';
//返回值用
Observable<SearchResponse<{}>> 包装
//通过
fromPromise(<Promise<SearchResponse<{}>>> ES的方法); 把promise转为Observable
getIndices(filterPath: string[] = ['-*kibana*', '-*es*', '-*monitoring*', 'metric*']): Observable<SearchResponse<{}>> {
return fromPromise(<Promise<SearchResponse<{}>>>
this._client.indices.getAlias({ method: 'GET', filterPath: filterPath })
);
}
-
集成kibana的es没有一个获取所有index的接口, 一般通过_alias和filter_path的来得到所有index的名字
/*
-> GET http://localhost:9200/_alias?filter_path=-*kibana*%2C-*es*%2C-*monitoring*%2Cmetric*
<- 200
{
"metric_2018_05_30_ctdms`1": {
"aliases": {}
},
"metric_2018_05_30_ctaws`1": {
"aliases": {}
},
"metric_2018_05_30_ctscanstatistics`1": {
"aliases": {}
},
"metric_2018_05_30_ctdailystatistics`1": {
"aliases": {}
}
}
*/
8. es可以指定查询数据的时间范围, 用'range'来指定, 具体的时间值有'now/d', 'now/M', 'now/y'. now表示当前时间, /d和/M以及/y代表当前时间的当前起始时间、当月起始时间、当年起始时间。比如now为2018-06-18 14:50:51, 那么now/d指的是2018-06-18 00:00:00; now/M指的是2018-06-01 00:00:00; now/y指的是2018-01-01 00:00:00. 另外now可以进行加减法操作, 比如now-1d,指的是2018-06-17 14:50:51。 当然, 时间值不局限于now语法,也可以指定具体的时间, 比如20180618。
另外还有gte代表大于等于, lte小于等于, gt大于, lt小于。 这些字符和时间组合起来, 就是一个完整的时间范围查询
-
filterpath中包含哪些元素, 则查询结果就只包含哪些元素, 这个相当于白名单; 另外也可以用减号'-'来说明不要哪些节点, 对查询结果来说相当于黑名单。
filterPath: string[] = ['-*kibana*', '-*es*', '-*monitoring*', 'metric*']; // 代表不要包含kibana字符的节点,不要包含es的节点, 不要包含monitoring的节点, 前面三个都是kibana在es中存储的index 信息。 起始我们可以只指定metric的即可了, 不需要多此一举还写了减号; 此处主要是列出kibana默认的index·
2个aggregation的例子:
/*
-> POST http://localhost:9200/*scanstatistics* /_search?filter_path=aggregations.agg_terms_items%5C.name%5C.keyword.buckets
{
"query": {
"bool": {
"must": [
{
"match": {
"systemID": "987654321098"
}
},
{
"range": {
"timeStamp": {
"gte": "now/M"
}
}
}
]
}
},
"aggs": {
"agg_terms_items.name.keyword": {
"terms": {
"field": "items.name.keyword"
},
"aggs": {
"agg_sum_items.value": {
"sum": {
"field": "items.value"
}
}
}
}
}
}
<- 200
{
"aggregations": {
"agg_terms_items.name.keyword": {
"buckets": [
{
"key": "Upper Extremity",
"doc_count": 15,
"agg_sum_items.value": {
"value": 1247
}
},
{
"key": "Pelvis",
"doc_count": 13,
"agg_sum_items.value": {
"value": 888
}
},
{
"key": "Neck",
"doc_count": 11,
"agg_sum_items.value": {
"value": 939
}
},
{
"key": "Spine",
"doc_count": 11,
"agg_sum_items.value": {
"value": 797
}
},
{
"key": "Thorax",
"doc_count": 11,
"agg_sum_items.value": {
"value": 1016
}
},
{
"key": "Abdomen",
"doc_count": 9,
"agg_sum_items.value": {
"value": 732
}
},
{
"key": "Head",
"doc_count": 8,
"agg_sum_items.value": {
"value": 531
}
}
]
}
}
}
*/
getScanBodypartStatistics(systemID: string, dateRange: any, indices: any = '*scanstatistics*'): Observable<SearchResponse<{}>> {
const filterPath: string[] = ['aggregations.agg_terms_items\\.name\\.keyword.buckets'];
const body = bodybuilder()
.addQuery('match', 'systemID', systemID)
.addQuery('range', 'timeStamp', dateRange)
.aggregation('terms', 'items.name.keyword', (a) => {
return a.aggregation('sum', 'items.value');
})
.build();
return Observable.fromPromise(<Promise<SearchResponse<{}>>>
this._client.search({ index: indices, method: 'POST', filterPath: filterPath, body: body })
);
}
/*
-> POST http://localhost:9200/*dms* /_search?filter_path=aggregations.agg_date_histogram_timeStamp.buckets
{
"query": {
"bool": {
"must": [
{
"match": {
"systemID": "987654321098"
}
},
{
"match": {
"items.name.keyword": "DetectorTempL"
}
},
{
"range": {
"timeStamp": {
"gte": "now/M"
}
}
}
]
}
},
"aggs": {
"agg_date_histogram_timeStamp": {
"date_histogram": {
"field": "timeStamp",
"interval": "1d",
"format": "yyyy-MM-dd"
},
"aggs": {
"agg_avg_items.value": {
"avg": {
"field": "items.value"
}
}
}
}
}
}
<- 200
{
"aggregations": {
"agg_date_histogram_timeStamp": {
"buckets": [
{
"key_as_string": "2018-05-30",
"key": 1527638400000,
"doc_count": 23,
"agg_avg_items.value": {
"value": 38.84304356229478
}
}
]
}
}
}
*/
getDetectorTemp(systemID: string, dateRange: any, detector: DetectorEnum, indices: any = '*dms*'):
Observable<SearchResponse<{}>> {
const filterPath: string[] = ['aggregations.agg_date_histogram_timeStamp.buckets'];
let detectorType: string;
switch (detector) {
case DetectorEnum.Left:
detectorType = 'DetectorTempL';
break;
case DetectorEnum.Middle:
detectorType = 'DetectorTempM';
break;
case DetectorEnum.Right:
detectorType = 'DetectorTempR';
break;
}
const body = bodybuilder()
.addQuery('match', 'systemID', systemID)
.addQuery('match', 'items.name.keyword', detectorType)
.addQuery('range', 'timeStamp', dateRange)
.aggregation('date_histogram', 'timeStamp', { interval: '1d', format: 'yyyy-MM-dd' }
, (a) => {
return a.agg('avg', 'items.value');
})
.build();
return Observable.fromPromise(<Promise<SearchResponse<{}>>>
this._client.search({ index: indices, method: 'POST', filterPath: filterPath, body: body })
);
}