package edgedb
import (
"database/sql"
"errors"
"fmt"
"sort"
"strings"
"time"
"gorm.io/driver/clickhouse"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
var (
pgConn *gorm.DB
ckConn *gorm.DB
)
type Config struct {
// 遥感数据存储,(默认: postgresql)
// 支持 postgresql/clickhouse,可多选通过 "," 连接多个选择
DeviceTelemetryRecordBackend string `yaml:"" env:"DEVICE_TELEMETRY_RECORD_BACKEND" envDefault:"postgresql"`
Postgresql struct {
Host string `yaml:"host" env:"POSTGRESQL_SERVER_HOST" envDefault:"postgresql"`
Port int `yaml:"port" env:"POSTGRESQL_SERVER_PORT" envDefault:"5432"`
EdgedbUser string `yaml:"user" env:"POSTGRESQL_SERVER_EDGEDB_USER" envDefault:"postgres"`
EdgedbPass string `yaml:"pass" env:"POSTGRESQL_SERVER_EDGEDB_PASS" envDefault:"password"`
EdgedbName string `yaml:"name" env:"POSTGRESQL_SERVER_EDGEDB_NAME" envDefault:"edgedb"`
PoolSize int `yaml:"pool_size" env:"POSTGRESQL_SERVER_POOLSIZE" envDefault:"500"` // 设置连接池的最大连接数
}
Clickhouse struct {
Enabled bool `yaml:"host" env:"CLICKHOUSE_SERVER_ENABLED" envDefault:"false"`
Host string `yaml:"host" env:"CLICKHOUSE_SERVER_HOST" envDefault:"clickhouse"`
Port int `yaml:"port" env:"CLICKHOUSE_SERVER_PORT" envDefault:"9000"`
User string `yaml:"port" env:"CLICKHOUSE_SERVER_PORT" envDefault:"default"`
Pass string `yaml:"port" env:"CLICKHOUSE_SERVER_PORT" envDefault:""`
EdgedbUser string `yaml:"user" env:"CLICKHOUSE_SERVER_EDGEDB_USER" envDefault:"root"`
EdgedbPass string `yaml:"pass" env:"CLICKHOUSE_SERVER_EDGEDB_PASS" envDefault:"[yz^_^edge]"`
EdgedbName string `yaml:"name" env:"CLICKHOUSE_SERVER_EDGEDB_NAME" envDefault:"edgedb"`
ReadTimeout int `yaml:"read_timeout" env:"READ_TIMEOUT" envDefault:"10"`
WriteTimeout int `yaml:"write_timeout" env:"WRITE_TIMEOUT" envDefault:"10"`
}
}
// SetupConn 初始化连接,配置连接池
// 注意:不要关闭数据库连接客户端 sql.DB,否则连接池无法生效,如无特殊需求,只需判断有无异常即可。通过 GetXXConn 函数向连接池申请连接
func SetupConn(cfg *Config) (*sql.DB, error) {
// 加载配置
if cfg == nil {
err := errors.New("Config needed")
return nil, err
}
// 初始化 Postgres 连接
db, err := _setupPostgresConn(cfg)
if err != nil {
return nil, err
}
defer db.Close()
// 如系统配置 Clickhouse 作为遥测扩展存储,则额外增加 CK 连接的初始化
telemetryDatabaseBackends := strings.Split(cfg.DeviceTelemetryRecordBackend, ",")
sort.Strings(telemetryDatabaseBackends)
index := sort.SearchStrings(telemetryDatabaseBackends, "clickhouse")
if index < len(telemetryDatabaseBackends) && telemetryDatabaseBackends[index] == "clickhouse" {
// 初始化 Clickhouse 连接
if _, err := _setupClickhouseConn(cfg); err != nil {
return nil, err
}
}
return db, nil
}
func _setupPostgresConn(cfg *Config) (*sql.DB, error) {
if cfg == nil {
if config, err := configs.LoadConfig(); err != nil {
return nil, err
} else {
cfg = config
}
}
dsn := fmt.Sprintf(
"host=%s port=%d user=%s password=%s dbname=%s sslmode=disable TimeZone=Asia/Shanghai",
cfg.Postgresql.Host,
cfg.Postgresql.Port,
cfg.Postgresql.EdgedbUser,
cfg.Postgresql.EdgedbPass,
cfg.Postgresql.EdgedbName,
)
conn, err := gorm.Open(postgres.New(postgres.Config{
DSN: dsn,
PreferSimpleProtocol: true, // disables implicit prepared statement usage
}), &gorm.Config{})
if err != nil {
return nil, err
}
pgDB, err := conn.DB()
if err != nil {
return nil, err
}
// SetMaxIdleConns sets the maximum number of connections in the idle connection pool.
// 设置可空闲的最大连接数,随时等待调用
pgDB.SetMaxIdleConns(10)
// SetMaxOpenConns sets the maximum number of open connections to the database.
// 设置连接池的最大连接数,不配置默认就是不限制 (当前500,依赖 postgres max_connections)
pgDB.SetMaxOpenConns(cfg.Postgresql.PoolSize)
// SetConnMaxLifetime sets the maximum amount of time a connection may be reused.
// 连接的最长存活期,超过这个时间,连接将会重置,不再被复用,不配置默认就是永不过期
pgDB.SetConnMaxLifetime(time.Second * 600)
// Global
pgConn = conn
return pgDB, nil
}
// 初始化clickhouse的连接,配置连接池
func _setupClickhouseConn(cfg *Config) (*sql.DB, error) {
if cfg == nil {
if config, err := configs.LoadConfig(); err != nil {
return nil, err
} else {
cfg = config
}
}
dsn := fmt.Sprintf(
"tcp://%s:%d?database=%s&username=%s&password=%s&read_timeout=%d&write_timeout=%d",
cfg.Clickhouse.Host,
cfg.Clickhouse.Port,
cfg.Clickhouse.EdgedbName,
cfg.Clickhouse.EdgedbUser,
cfg.Clickhouse.EdgedbPass,
cfg.Clickhouse.ReadTimeout,
cfg.Clickhouse.WriteTimeout,
)
conn, err := gorm.Open(clickhouse.New(clickhouse.Config{
DSN: dsn,
DisableDatetimePrecision: true, // disable datetime64 precision, not supported before clickhouse 20.4
DontSupportRenameColumn: true, // rename column not supported before clickhouse 20.4
SkipInitializeWithVersion: false, // smart configure based on used version
DefaultGranularity: 3, // 1 granule = 8192 rows
DefaultCompression: "LZ4", // default compression algorithm. LZ4 is lossless
DefaultIndexType: "minmax", // index stores extremes of the expression
DefaultTableEngineOpts: "ENGINE=MergeTree() ORDER BY tuple()",
}), &gorm.Config{})
if err != nil {
return nil, err
}
ckDB, err := conn.DB()
if err != nil {
return nil, err
}
// SetMaxIdleConns sets the maximum number of connections in the idle connection pool.
// 设置可空闲的最大连接数,随时等待调用
ckDB.SetMaxIdleConns(1)
// SetMaxOpenConns sets the maximum number of open connections to the database.
// 设置连接池的最大连接数,不配置,默认为 0,就是不限制
ckDB.SetMaxOpenConns(10)
// SetConnMaxLifetime sets the maximum amount of time a connection may be reused.
// 连接的最长存活期,超过这个时间,连接将会重置,不再被复用,不配置默认就是永不过期
ckDB.SetConnMaxLifetime(time.Second * 600)
ckConn = conn
return ckDB, nil
}
// GetDBConn 从连接池获取数据库连接
func GetDBConn() (*gorm.DB, error) {
var pgDB *sql.DB
if pgConn == nil {
if db, err := _setupPostgresConn(nil); err != nil {
return nil, err
} else {
pgDB = db
}
} else if db, err := pgConn.DB(); err != nil {
// 尝试获取连接池失败,重新建立连接
if db, err := _setupPostgresConn(nil); err != nil {
return nil, err
} else {
pgDB = db
}
} else {
pgDB = db
}
if err := pgDB.Ping(); err != nil {
// 尝试 PING 失败,重新建立连接
_ = pgDB.Close()
if db, err := _setupPostgresConn(nil); err != nil {
return nil, err
} else {
pgDB = db
}
}
return pgConn, nil
}
// GetDBConn 从连接池获取数据库连接
func GetClickhouseConn() (*gorm.DB, error) {
var ckDB *sql.DB
if ckConn == nil {
if db, err := _setupClickhouseConn(nil); err != nil {
return nil, err
} else {
ckDB = db
}
} else if db, err := ckConn.DB(); err != nil {
if db, err := _setupClickhouseConn(nil); err != nil {
return nil, err
} else {
ckDB = db
}
} else {
ckDB = db
}
if err := ckDB.Ping(); err != nil {
_ = ckDB.Close()
if db, err := _setupClickhouseConn(nil); err != nil {
return nil, err
} else {
ckDB = db
}
}
return ckConn, nil
}