>> 资源下载: https://72k.us/file/14896800-396374653
mgo:是MongoDB的Go语言驱动,它用基于Go语法的简单API实现了丰富的特性,并经过良好测试。使用起来很顺手,文档足够,前期一直在使用,可惜是不维护了;
mongo-go-driver:官方的驱动,设计的很底层,从mgo转来的时候不是很顺手,主要是使用事务;
连接数据库
mgo
import (
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
)
session, err := mgo.Dial("127.0.0.1:27017")
mongo-go-driver
import (
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
client, err := mongo.NewClient(options.Client().ApplyURI("mongodb://localhost:27017"))
两者在数据库的连接上都很简单,后者使用了options,可以设置连接数,连接时间,socket时间,超时时间等;
索引
mgo
func createUniqueIndex(collection string, keys ...string) {
ms, c := Connect(setting.DatabaseSetting.DBName, collection)
defer ms.Close()
// 设置统计表唯一索引
index := mgo.Index{
Key: keys, // 索引键
Unique: true, // 唯一索引
DropDups: true, // 存在数据后创建, 则自动删除重复数据
Background: true, // 不长时间占用写锁
}
// 创建索引
err := c.EnsureIndex(index)
if err != nil {
Logger.Error("EnsureIndex error", zap.String("error", err.Error()))
}
}
mongo-go-driver
func createUniqueIndex(collection string, keys ...string) {
db := DB.Mongo.Database(setting.DatabaseSetting.DBName).Collection(collection)
opts := options.CreateIndexes().SetMaxTime(10 * time.Second)
indexView := db.Indexes()
keysDoc := bsonx.Doc{}
// 复合索引
for _, key := range keys {
if strings.HasPrefix(key, "-") {
keysDoc = keysDoc.Append(strings.TrimLeft(key, "-"), bsonx.Int32(-1))
} else {
keysDoc = keysDoc.Append(key, bsonx.Int32(1))
}
}
// 创建索引
result, err := indexView.CreateOne(
context.Background(),
mongo.IndexModel{
Keys: keysDoc,
Options: options.Index().SetUnique(true),
},
opts,
)
if result == "" || err != nil {
Logger.Error("EnsureIndex error", zap.String("error", err.Error()))
}
}
mgo可以直接构建复合索引,按顺序传入多个参数就可以createIndex(addrTrxC, "address_id", "asset_id")
,但是mongo-go-driver,需要自己做一下处理
查询
mgo
func FindProNode() ([]DBNode, error) {
var nodes []DBNode
ms, c := Connect(setting.DatabaseSetting.DBName, superNodeC)
defer ms.Close()
err := c.Find(M{}).Sort("-vote_count").Limit(10).All(&nodes)
if err != nil {
return nil, err
}
return nodes, nil
}
mongo-go-driver
func FindNodes() ([]DBNode, error) {
var nodes []DBNode
c := Connect(setting.DatabaseSetting.DBName, superNodeC)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
opts := options.Find().SetSort(bsonx.Doc{{"vote_count", bsonx.Int32(-1)}})
cursor, err := c.Find(ctx, M{}, opts)
if err != nil {
return nil, err
}
for cursor.Next(context.Background()) {
var node DBNode
if err = cursor.Decode(&node); err != nil {
return nil, err
} else {
nodes = append(nodes, node)
}
}
return nodes, nil
}
在查询单个元素的时候,两个驱动都都方便,但是,当查询多个元素的时候,mongo-go-driver不能直接解析到数组,需要借助cursor这个类型,遍历解析所有的数据(麻烦,需要自己封装)。
插入
mgo
// 通用
func Insert(db, collection string, docs ...interface{}) error {
ms, c := Connect(db, collection)
defer ms.Close()
return c.Insert(docs...)
}
//插入
func InsertNode(node DBNode) error {
err := Insert(setting.DatabaseSetting.DBName, superNodeC, node)
return err
}
mongo-go-driver
func Insert(db, collection string, docs ...interface{}) (*mongo.InsertManyResult, error) {
c := Connect(db, collection)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
return c.InsertMany(ctx, docs)
}
func InsertNode(node DBNode) error {
_, err := Insert(setting.DatabaseSetting.DBName, superNodeC, node)
return err
}
插入的区别也不是很大
修改
mgo
func UpsertNode(node DBNode) (*mgo.ChangeInfo, error) {
findM := M{"pub_key": node.PubKey}
ms, c := Connect(setting.DatabaseSetting.DBName, superNodeC)
defer ms.Close()
updateM := bson.M{"$inc": M{"vote_count": node.VoteCount},
"$set": M{
"name": node.Name,
"address": node.Address,
"first_timestamp": node.FirstTimestamp}}
return c.Upsert(findM, updateM)
}
mongo-go-driver
func Update(db, collection string, query, update interface{}) (*mongo.UpdateResult, error) {
c := Connect(db, collection)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
opts := options.Update().SetUpsert(true)
return c.UpdateOne(ctx, query, update,opts)
}
两者都可以更新一个mgo(update) mongo-go-driver(updateOne),和更新多个mgo(updateAll) mongo-go-driver(updateMany);但是在upsert的时候,mgo直接使用upsert就可以,mongo-go-driver需要设置opts := options.Update().SetUpsert(true)
删除
mgo
func Remove(db, collection string, query interface{}) error {
ms, c := Connect(db, collection)
defer ms.Close()
return c.Remove(query)
}
func RemoveNode(pubKey string) error {
findM := M{"pub_key": pubKey}
err := Remove(setting.DatabaseSetting.DBName, superNodeC, findM)
return err
}
mongo-go-driver
func Remove(db, collection string, query interface{}) (*mongo.DeleteResult, error) {
c := Connect(db, collection)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
return c.DeleteOne(ctx, query)
}
func RemoveNode(pubKey string) error {
findM := M{"pub_key": pubKey}
_, err := Remove(setting.DatabaseSetting.DBName, superNodeC, findM)
return err
}
删除也比较简单,大致相同
事务
mongo-go-driver
所有的修改之前需要查询的,请都使用SessionContext(即都使用事务)
因为多处使用,所以封装了一个方法;
在这个方法中需要实现的方法是Exec的operator
type DBTransaction struct {
Commit func(mongo.SessionContext) error
Run func(mongo.SessionContext, func(mongo.SessionContext, DBTransaction) error) error
Logger *logging.Logger
}
func NewDBTransaction(logger *logging.Logger) *DBTransaction {
var dbTransaction = &DBTransaction{}
dbTransaction.SetLogger(logger)
dbTransaction.SetRun()
dbTransaction.SetCommit()
return dbTransaction
}
func (d *DBTransaction) SetCommit() {
d.Commit = func(sctx mongo.SessionContext) error {
err := sctx.CommitTransaction(sctx)
switch e := err.(type) {
case nil:
d.Logger.Info("Transaction committed.")
return nil
default:
d.Logger.Error("Error during commit...")
return e
}
}
}
func (d *DBTransaction) SetRun() {
d.Run = func(sctx mongo.SessionContext, txnFn func(mongo.SessionContext, DBTransaction) error) error {
err := txnFn(sctx, *d) // Performs transaction.
if err == nil {
return nil
}
d.Logger.Error("Transaction aborted. Caught exception during transaction.",
zap.String("error", err.Error()))
return err
}
}
func (d *DBTransaction) SetLogger(logger *logging.Logger) {
d.Logger = logger
}
func (d *DBTransaction) Exec(mongoClient *mongo.Client, operator func(mongo.SessionContext, DBTransaction) error) error {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Minute)
defer cancel()
return mongoClient.UseSessionWithOptions(
ctx, options.Session().SetDefaultReadPreference(readpref.Primary()),
func(sctx mongo.SessionContext) error {
return d.Run(sctx, operator)
},
)
}
//具体调用
func SyncBlockData(node models.DBNode) error {
dbTransaction := db_session_service.NewDBTransaction(Logger)
// Updates two collections in a transaction.
updateEmployeeInfo := func(sctx mongo.SessionContext, d db_session_service.DBTransaction) error {
err := sctx.StartTransaction(options.Transaction().
SetReadConcern(readconcern.Snapshot()).
SetWriteConcern(writeconcern.New(writeconcern.WMajority())),
)
if err != nil {
return err
}
err = models.InsertNodeWithSession(sctx, node)
if err != nil {
_ = sctx.AbortTransaction(sctx)
d.Logger.Info("caught exception during transaction, aborting.")
return err
}
return d.Commit(sctx)
}
return dbTransaction.Exec(models.DB.Mongo, updateEmployeeInfo)
}