当前位置: 首页 > 工具软件 > Restbed > 使用案例 >

使用restbed 写入influxDB 数据库

岳时铭
2023-12-01

关于influxDB 数据库

influxdb是目前比较流行的时间序列数据库。它和传统的数据库有所不同。

一些基本概念

influxdb 与 Mysql 对比

库、表比较

influxDB

传统数据库中的概念

database

数据库

measurement

数据库中的表

points

表里面的一行数据

 

Influxdb 数据的构成

Point由时间戳(time)、数据(field)、标签(tags)组成。

Point属性

传统数据库中的概念

time

每个数据记录时间,是数据库中的主索引(会自动生成)

fields

各种记录值(没有索引的属性)也就是记录的值:温度, 湿度

tags

各种有索引的属性:地区,海拔

 

默认端口

HTTP service 默认运行端口:8086: Influxdb

influxDB HTTP API 协议

  可以通过命令行,库或者http API来访问influxDB 数据库。这里我们介绍http API 访问influxDB 。

测试http api 的方式可以使用 curl 命令行,也可以使用POSTMAN 软件。当然,也可以自己编程序。

建立数据库

使用POST 方法 建立数据库

curl 命令

curl -POST http://localhost:8086/query --data-urlencode "q=CREATE DATABASE mydb"

POSTMAN

在POSTMAP 中选择 POST http://192.168.31.98:8086/query

Content-Type 选择:application/x-www-form-urlencoded

raw 中写入:q=CREATE+DATABASE+mydb

写入数据

curl 命令

curl -i -XPOST 'http://localhost:8086/write?db=mydb' --data-binary 'vibration,host=localhost, value=0.64 '

POSTMAN

在POSTMAP 中选择 POST http://192.168.31.98:8086/write?db=mydb2

Content-Type 选择 application/x-www-form-urlencoded

body 选择 raw 输入:vibration,host=localhost  value=32.145 

其它命令

删除数据库

curl -POST http://localhost:8086/query --data-urlencode "q=DROP DATABASE mydb"

 

C++ influxDB 客户端

    下面是使用restbed 实现的C++ influxDB 客户端。遇到的问题是 influxdb 的响应。它是一种 chunk 编码的 json 。getBody 时会带有一个16进制的chunk size。需要读它,并跳过它。

代码

#include <memory>
#include <future>
#include <cstdio>
#include <cstdlib>
#include <restbed>
#include <string>
#include <iostream>
#include "rapidjson/document.h"
#include "rapidjson/writer.h"
#include "rapidjson/stringbuffer.h"
#include <pthread.h>
#include <unistd.h>
using namespace std;
using namespace restbed;
using namespace rapidjson;
#define DB_HOST "192.168.31.108:8086"
 void read_chunk(std::shared_ptr< Response > response ,const Bytes& data );
 int  response_Parser(string response_body )
 {
   
     Document res;
     res.Parse<rapidjson::kParseStopWhenDoneFlag, rapidjson::UTF8<> >(response_body.c_str());   
      if (res.HasParseError()) {      
         cout<<"error"<<endl;
         return -1;
       };
      
        if (res.HasMember("results"))
       {      
		Value &v=res["results"];
        Value results(kObjectType);
        results=v[0];
        int statement_id=results["statement_id"].GetInt();
        cout<<"statment_id: "<<statement_id<<endl;
         } else
         {
             cout<<"no ans"<<endl;
         }
     return 0;    
 }
void read_chunk_size(std::shared_ptr< Response > response , const Bytes& data )
{
    if ( not data.empty( ) )
    {
        const string length( data.begin( ), data.end( ) );
 
        if ( length not_eq "0\r\n" )
        {
            const auto chunk_size = stoul( length, nullptr, 16 ) + strlen( "\r\n" );
         
            Http::fetch( chunk_size, response);
           
        }
    }

}

int query_db(string url,string message_body)
{
   //cout<<"url: "<<url<<" body: "<<message_body<<endl;
    auto request = make_shared< Request >( Uri( url) );
   request->set_method("POST");
   request->set_header("Host",DB_HOST);
   request->add_header("Content-Type","application/x-www-form-urlencoded");
   request->add_header("Content-Length",to_string(message_body.length()));
   request->set_body(message_body);
    auto response = Http::sync( request );
   if(response->get_status_code()==200)
   { 
    if ( response->get_header( "Transfer-Encoding", String::lowercase ) == "chunked" )
    {  
       
       Bytes buffer= Http::fetch( "\r\n" ,response);
        read_chunk_size(response,buffer);
         const auto body =response->get_body( );
         //skip chunk size
        string  body_str(reinterpret_cast<char*>((char *)body.data()));
          string response_body=body_str.replace(body_str.begin(),body_str.begin()+body_str.find("\r\n"),"");
        response_Parser(response_body);
         return EXIT_SUCCESS;
    }
    else if ( request->has_header( "Content-Length" ) )
    {    
        int length = response->get_header( "Content-Length", 0 );

        Http::fetch( length, response );          
        const auto body =response->get_body( );
        string  body_str(reinterpret_cast<char*>((char *)body.data()));
        response_Parser(body_str);
       return EXIT_SUCCESS;
    }
   }
           return 0;

}
 
void write_value(string db_name,string measurement, float val)
{
    string message_body=measurement+" value="+to_string(val);
    query_db("http://"+(string)DB_HOST+"/write?db="+db_name,message_body);
}
int main( const int, const char** )
{  
string db_name="mydb2";
string measurement="vibration,host=localhost";

// create influxDB database
query_db("http://"+(string)DB_HOST+"/query","q=CREATE+DATABASE+"+db_name);
float val=0.6;
    while(true)
    {
      write_value(db_name,measurement,val);
      val=val+1.0;
      if(val>100) val=0;
        sleep(1);
    }
       
    return EXIT_SUCCESS;
}

 

 类似资料: