需要及时获取短信发送消息队列的堆积情况;pass将相关信息及时的存入了opentsdb时序数据库中,现需要读取时序数据库将堆积情况以短信形式通知
时间序列数据库:主要用来存储时间序列数据,常常用来做监控预警数据的存储
OpenTSDB是基于HBase存储时间序列数据的一个开源数据库
特点:
存储:
读能力
中文文档:
https://www.docs4dev.com/amp/docs/zh/opentsdb/2.3/reference/installation.html#runtime-requirements
maven依赖:
<dependency>
<groupId>com.github.eulery</groupId>
<artifactId>opentsdb-java-sdk</artifactId>
<version>1.1.6</version>
</dependency>
工具类:
package com.lmy.util;
import org.apache.http.nio.reactor.IOReactorException;
import org.opentsdb.client.OpenTSDBClient;
import org.opentsdb.client.OpenTSDBClientFactory;
import org.opentsdb.client.OpenTSDBConfig;
import org.opentsdb.client.bean.request.Point;
import org.opentsdb.client.bean.response.DetailResult;
import org.opentsdb.client.http.callback.BatchPutHttpResponseCallback;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* OpenTsDbUtil类
* @author 86152
*/
public class OpenTsDbUtil {
private static final String OPENTSDB_HOST = "http://127.0.0.1";
private static final int OPENTSDB_PORT = 8080;
private static final OpenTSDBConfig config;
static {
config = OpenTSDBConfig
// OpenTsDb数据库地址和端口号
.address(OPENTSDB_HOST, OPENTSDB_PORT)
// http连接池大小,默认100
.httpConnectionPool(100)
// http请求超时时间,默认100s
.httpConnectTimeout(100)
// 异步写入数据时,每次http提交的数据条数,默认50
.batchPutSize(50)
// 异步写入数据中,内部有一个队列,默认队列大小20000
.batchPutBufferSize(20000)
// 异步写入等待时间,如果距离上一次请求超多300ms,且有数据,则直接提交
.batchPutTimeLimit(300)
// 当确认这个client只用于查询时设置,可不创建内部队列从而提高效率
.readonly()
// 每批数据提交完成后回调
.batchPutCallBack(new BatchPutHttpResponseCallback.BatchPutCallBack() {
@Override
public void response(List<Point> points, DetailResult result) {
// 在请求完成并且response code成功时回调
}
@Override
public void responseError(List<Point> points, DetailResult result) {
// 在response code失败时回调
}
@Override
public void failed(List<Point> points, Exception e) {
// 在发生错误是回调
}
}).config();
}
/**
* 获取客户端
*/
public static OpenTSDBClient getClient() {
OpenTSDBClient client = null;
try {
client = OpenTSDBClientFactory.connect(config);
} catch (IOReactorException e) {
e.printStackTrace();
}
return client;
}
/**
* 插入单个tag数据
*/
public static void insertOne(OpenTSDBClient client, String metric, String tagName, String tagValue, Number value) {
//获取当前秒
Long timestamp = System.currentTimeMillis() / 1000;
//创建数据对象
Point point = Point.metric(metric).tag(tagName, tagValue).value(timestamp, value).build();
//将对象插入数据库
client.put(point);
}
/**
* 插入多个tag数据
*/
public static void insertMap(OpenTSDBClient client, String metric, Map<String, String> tags, Number value) {
//获取当前秒
Long timestamp = System.currentTimeMillis() / 1000;
//创建数据对象
Point point = Point.metric(metric).tag(tags).value(timestamp, value).build();
//将对象插入数据库
client.put(point);
}
/**
* 优雅关闭连接,会等待所有异步操作完成
*
* @param client 需要关闭的客户端
*/
public static void close(OpenTSDBClient client) {
if (client != null) {
try {
client.gracefulClose();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
查询
//获取客户端
OpenTSDBClient client = OpenTsDbUtil.getClient();
//查询条件集合
List<SubQuery.Filter> filterList = new ArrayList<>();
//查询条件
SubQuery.Filter filter = new SubQuery.Filter();
//设置成true, 不设置或设置成false会导致读超时
filter.setGroupBy(Boolean.TRUE);
//设置过滤类型
//LiteralOr:等于查询,或查询,类似 SQL 里的 IN 查询;
//NotLiteralOr:等于查询,或查询,类似 SQL 里的 NOT IN 查询;
//Wildcard:模糊匹配,类似 SQL 里的 like 查询;
//Regexp:正则匹配;
filter.setType(SubQuery.Filter.FilterType.LITERAL_OR);
//设置tag,即查询的条件对象
filter.setTagk("metric_code");
//要查询的tag
filter.setFilter("MQ_001|" + "MQ_002");
filterList.add(filter);
//查询的时间范围,3m-ago:3秒前到当前数据
Query query = Query.begin("3m-ago")
//要查询的库
.sub(SubQuery.metric("3066688880001")
//查询的聚合类型
.aggregator(SubQuery.Aggregator.NONE)
.filter(filterList)
.build())
.build();
// 同步查询
List<QueryResult> resultList = client.query(query);
// 异步查询
client.query(query, new QueryHttpResponseCallback.QueryCallback() {
@Override
public void response(Query query, List<QueryResult> queryResults) {
// 在请求完成并且response code成功时回调
}
@Override
public void responseError(Query query, HttpException e) {
// 在response code失败时回调
}
@Override
public void failed(Query query, Exception e) {
// 在发生错误是回调
}
})
插入数据
//获取客户端
OpenTSDBClient client = OpenTsDbUtil.getClient();
//获取当前秒
Long timestamp = System.currentTimeMillis() / 1000;
//创建数据对象
Point point = Point.metric("point").tag("testTag", "test").value(timestamp, 1.0).build();
//将对象插入数据库
client.put(point);
//关闭资源
OpenTsDbUtil.close(client);
删除
Query query = Query.begin("7d-ago")
.sub(SubQuery.metric("metric.test")
.aggregator(SubQuery.Aggregator.NONE)
.build())
.build();
client.delete(query);