当前位置: 首页 > 工具软件 > clickhouse-go > 使用案例 >

golang获取postgres或clickhouse连接

孔星宇
2023-12-01
package edgedb

import (
	"database/sql"
	"errors"
	"fmt"
	"sort"
	"strings"
	"time"

	"gorm.io/driver/clickhouse"
	"gorm.io/driver/postgres"
	"gorm.io/gorm"
)

var (
	pgConn *gorm.DB
	ckConn *gorm.DB
)

type Config struct {
	// 遥感数据存储,(默认: postgresql)
	// 支持 postgresql/clickhouse,可多选通过 "," 连接多个选择
	DeviceTelemetryRecordBackend string `yaml:"" env:"DEVICE_TELEMETRY_RECORD_BACKEND" envDefault:"postgresql"`

	Postgresql struct {
		Host       string `yaml:"host" env:"POSTGRESQL_SERVER_HOST" envDefault:"postgresql"`
		Port       int    `yaml:"port" env:"POSTGRESQL_SERVER_PORT" envDefault:"5432"`
		EdgedbUser string `yaml:"user" env:"POSTGRESQL_SERVER_EDGEDB_USER" envDefault:"postgres"`
		EdgedbPass string `yaml:"pass" env:"POSTGRESQL_SERVER_EDGEDB_PASS" envDefault:"password"`
		EdgedbName string `yaml:"name" env:"POSTGRESQL_SERVER_EDGEDB_NAME" envDefault:"edgedb"`
		PoolSize   int    `yaml:"pool_size" env:"POSTGRESQL_SERVER_POOLSIZE" envDefault:"500"` // 设置连接池的最大连接数
	}

	Clickhouse struct {
		Enabled      bool   `yaml:"host" env:"CLICKHOUSE_SERVER_ENABLED" envDefault:"false"`
		Host         string `yaml:"host" env:"CLICKHOUSE_SERVER_HOST" envDefault:"clickhouse"`
		Port         int    `yaml:"port" env:"CLICKHOUSE_SERVER_PORT" envDefault:"9000"`
		User         string `yaml:"port" env:"CLICKHOUSE_SERVER_PORT" envDefault:"default"`
		Pass         string `yaml:"port" env:"CLICKHOUSE_SERVER_PORT" envDefault:""`
		EdgedbUser   string `yaml:"user" env:"CLICKHOUSE_SERVER_EDGEDB_USER" envDefault:"root"`
		EdgedbPass   string `yaml:"pass" env:"CLICKHOUSE_SERVER_EDGEDB_PASS" envDefault:"[yz^_^edge]"`
		EdgedbName   string `yaml:"name" env:"CLICKHOUSE_SERVER_EDGEDB_NAME" envDefault:"edgedb"`
		ReadTimeout  int    `yaml:"read_timeout" env:"READ_TIMEOUT" envDefault:"10"`
		WriteTimeout int    `yaml:"write_timeout" env:"WRITE_TIMEOUT" envDefault:"10"`
	}
}

// SetupConn 初始化连接,配置连接池
// 注意:不要关闭数据库连接客户端 sql.DB,否则连接池无法生效,如无特殊需求,只需判断有无异常即可。通过 GetXXConn 函数向连接池申请连接
func SetupConn(cfg *Config) (*sql.DB, error) {
	// 加载配置
	if cfg == nil {
		err := errors.New("Config needed")
		return nil, err
	}

	// 初始化 Postgres 连接
	db, err := _setupPostgresConn(cfg)
	if err != nil {
		return nil, err
	}
	defer db.Close()

	// 如系统配置 Clickhouse 作为遥测扩展存储,则额外增加 CK 连接的初始化
	telemetryDatabaseBackends := strings.Split(cfg.DeviceTelemetryRecordBackend, ",")
	sort.Strings(telemetryDatabaseBackends)
	index := sort.SearchStrings(telemetryDatabaseBackends, "clickhouse")
	if index < len(telemetryDatabaseBackends) && telemetryDatabaseBackends[index] == "clickhouse" {
		// 初始化 Clickhouse 连接
		if _, err := _setupClickhouseConn(cfg); err != nil {
			return nil, err
		}
	}

	return db, nil
}

func _setupPostgresConn(cfg *Config) (*sql.DB, error) {
	if cfg == nil {
		if config, err := configs.LoadConfig(); err != nil {
			return nil, err
		} else {
			cfg = config
		}
	}

	dsn := fmt.Sprintf(
		"host=%s port=%d user=%s password=%s dbname=%s sslmode=disable TimeZone=Asia/Shanghai",
		cfg.Postgresql.Host,
		cfg.Postgresql.Port,
		cfg.Postgresql.EdgedbUser,
		cfg.Postgresql.EdgedbPass,
		cfg.Postgresql.EdgedbName,
	)
	conn, err := gorm.Open(postgres.New(postgres.Config{
		DSN:                  dsn,
		PreferSimpleProtocol: true, // disables implicit prepared statement usage
	}), &gorm.Config{})
	if err != nil {
		return nil, err
	}

	pgDB, err := conn.DB()
	if err != nil {
		return nil, err
	}

	// SetMaxIdleConns sets the maximum number of connections in the idle connection pool.
	// 设置可空闲的最大连接数,随时等待调用
	pgDB.SetMaxIdleConns(10)

	// SetMaxOpenConns sets the maximum number of open connections to the database.
	// 设置连接池的最大连接数,不配置默认就是不限制 (当前500,依赖 postgres max_connections)
	pgDB.SetMaxOpenConns(cfg.Postgresql.PoolSize)

	// SetConnMaxLifetime sets the maximum amount of time a connection may be reused.
	// 连接的最长存活期,超过这个时间,连接将会重置,不再被复用,不配置默认就是永不过期
	pgDB.SetConnMaxLifetime(time.Second * 600)

	// Global
	pgConn = conn

	return pgDB, nil
}

// 初始化clickhouse的连接,配置连接池
func _setupClickhouseConn(cfg *Config) (*sql.DB, error) {
	if cfg == nil {
		if config, err := configs.LoadConfig(); err != nil {
			return nil, err
		} else {
			cfg = config
		}
	}

	dsn := fmt.Sprintf(
		"tcp://%s:%d?database=%s&username=%s&password=%s&read_timeout=%d&write_timeout=%d",
		cfg.Clickhouse.Host,
		cfg.Clickhouse.Port,
		cfg.Clickhouse.EdgedbName,
		cfg.Clickhouse.EdgedbUser,
		cfg.Clickhouse.EdgedbPass,
		cfg.Clickhouse.ReadTimeout,
		cfg.Clickhouse.WriteTimeout,
	)
	conn, err := gorm.Open(clickhouse.New(clickhouse.Config{
		DSN:                       dsn,
		DisableDatetimePrecision:  true,     // disable datetime64 precision, not supported before clickhouse 20.4
		DontSupportRenameColumn:   true,     // rename column not supported before clickhouse 20.4
		SkipInitializeWithVersion: false,    // smart configure based on used version
		DefaultGranularity:        3,        // 1 granule = 8192 rows
		DefaultCompression:        "LZ4",    // default compression algorithm. LZ4 is lossless
		DefaultIndexType:          "minmax", // index stores extremes of the expression
		DefaultTableEngineOpts:    "ENGINE=MergeTree() ORDER BY tuple()",
	}), &gorm.Config{})
	if err != nil {
		return nil, err
	}
	ckDB, err := conn.DB()
	if err != nil {
		return nil, err
	}

	// SetMaxIdleConns sets the maximum number of connections in the idle connection pool.
	// 设置可空闲的最大连接数,随时等待调用
	ckDB.SetMaxIdleConns(1)

	// SetMaxOpenConns sets the maximum number of open connections to the database.
	// 设置连接池的最大连接数,不配置,默认为 0,就是不限制
	ckDB.SetMaxOpenConns(10)

	// SetConnMaxLifetime sets the maximum amount of time a connection may be reused.
	// 连接的最长存活期,超过这个时间,连接将会重置,不再被复用,不配置默认就是永不过期
	ckDB.SetConnMaxLifetime(time.Second * 600)

	ckConn = conn
	return ckDB, nil
}

// GetDBConn 从连接池获取数据库连接
func GetDBConn() (*gorm.DB, error) {
	var pgDB *sql.DB

	if pgConn == nil {
		if db, err := _setupPostgresConn(nil); err != nil {
			return nil, err
		} else {
			pgDB = db
		}
	} else if db, err := pgConn.DB(); err != nil {
		// 尝试获取连接池失败,重新建立连接
		if db, err := _setupPostgresConn(nil); err != nil {
			return nil, err
		} else {
			pgDB = db
		}
	} else {
		pgDB = db
	}

	if err := pgDB.Ping(); err != nil {
		// 尝试 PING 失败,重新建立连接
		_ = pgDB.Close()

		if db, err := _setupPostgresConn(nil); err != nil {
			return nil, err
		} else {
			pgDB = db
		}
	}

	return pgConn, nil
}

// GetDBConn 从连接池获取数据库连接
func GetClickhouseConn() (*gorm.DB, error) {
	var ckDB *sql.DB

	if ckConn == nil {
		if db, err := _setupClickhouseConn(nil); err != nil {
			return nil, err
		} else {
			ckDB = db
		}
	} else if db, err := ckConn.DB(); err != nil {
		if db, err := _setupClickhouseConn(nil); err != nil {
			return nil, err
		} else {
			ckDB = db
		}
	} else {
		ckDB = db
	}

	if err := ckDB.Ping(); err != nil {
		_ = ckDB.Close()

		if db, err := _setupClickhouseConn(nil); err != nil {
			return nil, err
		} else {
			ckDB = db
		}
	}

	return ckConn, nil
}

 类似资料: