当前位置: 首页 > 工具软件 > OpenTSDB > 使用案例 >

TSDB时序数据库-OpenTSDB

长孙兴德
2023-12-01

背景

需要及时获取短信发送消息队列的堆积情况;pass将相关信息及时的存入了opentsdb时序数据库中,现需要读取时序数据库将堆积情况以短信形式通知

一:概念

时间序列数据库:主要用来存储时间序列数据,常常用来做监控预警数据的存储
OpenTSDB是基于HBase存储时间序列数据的一个开源数据库

特点:

  1. 顺序写:实时数据写入,多以追加的方式
  2. 毫无遗漏的接收并存储大量的时间序列数据
  3. 主要用做监控系统;譬如收集大规模集群(包括网络设备、操作系统、应用程序)的监控数据并进行存储,查询

存储:

  1. 无需转换,写的是什么数据存的就是什么数据
  2. 时序数据以毫秒的精度保存
  3. 永久保留原始数据

读能力

  1. 直接通过内置的GUI来生成图表
  2. 还可以通过HTTP API查询数据
  3. 另外还可以使用开源的前端与其交互

中文文档:
https://www.docs4dev.com/amp/docs/zh/opentsdb/2.3/reference/installation.html#runtime-requirements

二:使用引入

  1. maven依赖:

    <dependency>
        <groupId>com.github.eulery</groupId>
        <artifactId>opentsdb-java-sdk</artifactId>
        <version>1.1.6</version>
    </dependency>
    
  2. 工具类:

    package com.lmy.util;
    
    import org.apache.http.nio.reactor.IOReactorException;
    import org.opentsdb.client.OpenTSDBClient;
    import org.opentsdb.client.OpenTSDBClientFactory;
    import org.opentsdb.client.OpenTSDBConfig;
    import org.opentsdb.client.bean.request.Point;
    import org.opentsdb.client.bean.response.DetailResult;
    import org.opentsdb.client.http.callback.BatchPutHttpResponseCallback;
    
    import java.io.IOException;
    import java.util.List;
    import java.util.Map;
    
    /**
     * OpenTsDbUtil类
     * @author 86152
     */
    public class OpenTsDbUtil {
        private static final String OPENTSDB_HOST = "http://127.0.0.1";
        private static final int OPENTSDB_PORT = 8080;
        private static final OpenTSDBConfig config;
    
        static {
            config = OpenTSDBConfig
                    // OpenTsDb数据库地址和端口号
                    .address(OPENTSDB_HOST, OPENTSDB_PORT)
                    // http连接池大小,默认100
                    .httpConnectionPool(100)
                    // http请求超时时间,默认100s
                    .httpConnectTimeout(100)
                    // 异步写入数据时,每次http提交的数据条数,默认50
                    .batchPutSize(50)
                    // 异步写入数据中,内部有一个队列,默认队列大小20000
                    .batchPutBufferSize(20000)
                    // 异步写入等待时间,如果距离上一次请求超多300ms,且有数据,则直接提交
                    .batchPutTimeLimit(300)
                    // 当确认这个client只用于查询时设置,可不创建内部队列从而提高效率
                    .readonly()
                    // 每批数据提交完成后回调
                    .batchPutCallBack(new BatchPutHttpResponseCallback.BatchPutCallBack() {
                        @Override
                        public void response(List<Point> points, DetailResult result) {
                            // 在请求完成并且response code成功时回调
                        }
    
                        @Override
                        public void responseError(List<Point> points, DetailResult result) {
                            // 在response code失败时回调
                        }
    
                        @Override
                        public void failed(List<Point> points, Exception e) {
                            // 在发生错误是回调
                        }
                    }).config();
        }
    
        /**
         * 获取客户端
         */
        public static OpenTSDBClient getClient() {
            OpenTSDBClient client = null;
            try {
                client = OpenTSDBClientFactory.connect(config);
            } catch (IOReactorException e) {
                e.printStackTrace();
            }
            return client;
        }
    
        /**
         * 插入单个tag数据
         */
        public static void insertOne(OpenTSDBClient client, String metric, String tagName, String tagValue, Number value) {
            //获取当前秒
            Long timestamp = System.currentTimeMillis() / 1000;
            //创建数据对象
            Point point = Point.metric(metric).tag(tagName, tagValue).value(timestamp, value).build();
            //将对象插入数据库
            client.put(point);
        }
    
        /**
         * 插入多个tag数据
         */
        public static void insertMap(OpenTSDBClient client, String metric, Map<String, String> tags, Number value) {
            //获取当前秒
            Long timestamp = System.currentTimeMillis() / 1000;
            //创建数据对象
            Point point = Point.metric(metric).tag(tags).value(timestamp, value).build();
            //将对象插入数据库
            client.put(point);
        }
    
        /**
         * 优雅关闭连接,会等待所有异步操作完成
         *
         * @param client 需要关闭的客户端
         */
        public static void close(OpenTSDBClient client) {
            if (client != null) {
                try {
                    client.gracefulClose();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    
  3. 查询

    //获取客户端
    OpenTSDBClient client = OpenTsDbUtil.getClient();
    //查询条件集合
    List<SubQuery.Filter> filterList = new ArrayList<>();
    //查询条件
    SubQuery.Filter filter = new SubQuery.Filter();
    //设置成true, 不设置或设置成false会导致读超时
    filter.setGroupBy(Boolean.TRUE);
    //设置过滤类型
    //LiteralOr:等于查询,或查询,类似 SQL 里的 IN 查询;
    //NotLiteralOr:等于查询,或查询,类似 SQL 里的 NOT IN 查询;
    //Wildcard:模糊匹配,类似 SQL 里的 like 查询;
    //Regexp:正则匹配;
    filter.setType(SubQuery.Filter.FilterType.LITERAL_OR);
    //设置tag,即查询的条件对象
    filter.setTagk("metric_code");
    //要查询的tag
    filter.setFilter("MQ_001|" + "MQ_002");
    filterList.add(filter);
    //查询的时间范围,3m-ago:3秒前到当前数据
    Query query = Query.begin("3m-ago")
             //要查询的库
    		.sub(SubQuery.metric("3066688880001")
    		        //查询的聚合类型
    				.aggregator(SubQuery.Aggregator.NONE)
    				.filter(filterList)
    				.build())
    		.build();
    // 同步查询
    List<QueryResult> resultList = client.query(query);
    
    // 异步查询
    client.query(query, new QueryHttpResponseCallback.QueryCallback() {
        @Override
        public void response(Query query, List<QueryResult> queryResults) {
            // 在请求完成并且response code成功时回调
        }
    
        @Override
        public void responseError(Query query, HttpException e) {
            // 在response code失败时回调
        }
    
        @Override
        public void failed(Query query, Exception e) {
            // 在发生错误是回调
        }
    })
    
  4. 插入数据

    //获取客户端
    OpenTSDBClient client = OpenTsDbUtil.getClient();
    //获取当前秒
    Long timestamp = System.currentTimeMillis() / 1000;
    //创建数据对象
    Point point = Point.metric("point").tag("testTag", "test").value(timestamp, 1.0).build();
    //将对象插入数据库
    client.put(point);
    //关闭资源
    OpenTsDbUtil.close(client);
    
  5. 删除

    Query query = Query.begin("7d-ago")
                       .sub(SubQuery.metric("metric.test")
                       .aggregator(SubQuery.Aggregator.NONE)
                       .build())
                       .build();
    client.delete(query);
    
    
 类似资料: