influxdb是目前比较流行的时间序列数据库。它和传统的数据库有所不同。
influxdb 与 Mysql 对比
库、表比较
influxDB | 传统数据库中的概念 |
database | 数据库 |
measurement | 数据库中的表 |
points | 表里面的一行数据 |
Influxdb 数据的构成
Point由时间戳(time)、数据(field)、标签(tags)组成。
Point属性 | 传统数据库中的概念 |
time | 每个数据记录时间,是数据库中的主索引(会自动生成) |
fields | 各种记录值(没有索引的属性)也就是记录的值:温度, 湿度 |
tags | 各种有索引的属性:地区,海拔 |
默认端口
HTTP service 默认运行端口:8086: Influxdb
可以通过命令行,库或者http API来访问influxDB 数据库。这里我们介绍http API 访问influxDB 。
测试http api 的方式可以使用 curl 命令行,也可以使用POSTMAN 软件。当然,也可以自己编程序。
使用POST 方法 建立数据库
curl 命令
curl -POST http://localhost:8086/query --data-urlencode "q=CREATE DATABASE mydb"
POSTMAN
在POSTMAP 中选择 POST http://192.168.31.98:8086/query
Content-Type 选择:application/x-www-form-urlencoded
raw 中写入:q=CREATE+DATABASE+mydb
curl 命令
curl -i -XPOST 'http://localhost:8086/write?db=mydb' --data-binary 'vibration,host=localhost, value=0.64 '
POSTMAN
在POSTMAP 中选择 POST http://192.168.31.98:8086/write?db=mydb2
Content-Type 选择 application/x-www-form-urlencoded
body 选择 raw 输入:vibration,host=localhost value=32.145
其它命令
删除数据库
curl -POST http://localhost:8086/query --data-urlencode "q=DROP DATABASE mydb"
下面是使用restbed 实现的C++ influxDB 客户端。遇到的问题是 influxdb 的响应。它是一种 chunk 编码的 json 。getBody 时会带有一个16进制的chunk size。需要读它,并跳过它。
代码
#include <memory>
#include <future>
#include <cstdio>
#include <cstdlib>
#include <restbed>
#include <string>
#include <iostream>
#include "rapidjson/document.h"
#include "rapidjson/writer.h"
#include "rapidjson/stringbuffer.h"
#include <pthread.h>
#include <unistd.h>
using namespace std;
using namespace restbed;
using namespace rapidjson;
#define DB_HOST "192.168.31.108:8086"
void read_chunk(std::shared_ptr< Response > response ,const Bytes& data );
int response_Parser(string response_body )
{
Document res;
res.Parse<rapidjson::kParseStopWhenDoneFlag, rapidjson::UTF8<> >(response_body.c_str());
if (res.HasParseError()) {
cout<<"error"<<endl;
return -1;
};
if (res.HasMember("results"))
{
Value &v=res["results"];
Value results(kObjectType);
results=v[0];
int statement_id=results["statement_id"].GetInt();
cout<<"statment_id: "<<statement_id<<endl;
} else
{
cout<<"no ans"<<endl;
}
return 0;
}
void read_chunk_size(std::shared_ptr< Response > response , const Bytes& data )
{
if ( not data.empty( ) )
{
const string length( data.begin( ), data.end( ) );
if ( length not_eq "0\r\n" )
{
const auto chunk_size = stoul( length, nullptr, 16 ) + strlen( "\r\n" );
Http::fetch( chunk_size, response);
}
}
}
int query_db(string url,string message_body)
{
//cout<<"url: "<<url<<" body: "<<message_body<<endl;
auto request = make_shared< Request >( Uri( url) );
request->set_method("POST");
request->set_header("Host",DB_HOST);
request->add_header("Content-Type","application/x-www-form-urlencoded");
request->add_header("Content-Length",to_string(message_body.length()));
request->set_body(message_body);
auto response = Http::sync( request );
if(response->get_status_code()==200)
{
if ( response->get_header( "Transfer-Encoding", String::lowercase ) == "chunked" )
{
Bytes buffer= Http::fetch( "\r\n" ,response);
read_chunk_size(response,buffer);
const auto body =response->get_body( );
//skip chunk size
string body_str(reinterpret_cast<char*>((char *)body.data()));
string response_body=body_str.replace(body_str.begin(),body_str.begin()+body_str.find("\r\n"),"");
response_Parser(response_body);
return EXIT_SUCCESS;
}
else if ( request->has_header( "Content-Length" ) )
{
int length = response->get_header( "Content-Length", 0 );
Http::fetch( length, response );
const auto body =response->get_body( );
string body_str(reinterpret_cast<char*>((char *)body.data()));
response_Parser(body_str);
return EXIT_SUCCESS;
}
}
return 0;
}
void write_value(string db_name,string measurement, float val)
{
string message_body=measurement+" value="+to_string(val);
query_db("http://"+(string)DB_HOST+"/write?db="+db_name,message_body);
}
int main( const int, const char** )
{
string db_name="mydb2";
string measurement="vibration,host=localhost";
// create influxDB database
query_db("http://"+(string)DB_HOST+"/query","q=CREATE+DATABASE+"+db_name);
float val=0.6;
while(true)
{
write_value(db_name,measurement,val);
val=val+1.0;
if(val>100) val=0;
sleep(1);
}
return EXIT_SUCCESS;
}