当前位置: 首页 > 工具软件 > ElasticQuery > 使用案例 >

GoLang封装Elasticsearch

饶德元
2023-12-01

码字不易,转载请附原链,搬砖繁忙回复不及时见谅,技术交流请加QQ群:909211071

封装库:

package es

import (
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"github.com/olivere/elastic"
	"io/ioutil"
	"net/http"
	"why/config"
	"why/log"
	"why/util"
)

type Elastic struct {
	Client *elastic.Client
	host   string
}
/*
es 缓存池
 */
var (
	ElasticPool = make(map[string]*Elastic)
)

func InitES(name string) *Elastic {
	if es, ok := ElasticPool[name]; ok {
		return es
	}
	confES := config.GetConfigEntrance("es", name)
	host := confES["host"] + ":" + confES["port"]
	var err error
	client, err := elastic.NewClient(elastic.SetURL(host))
	if err != nil {
		panic(err)
	}
	info, code, err := client.Ping(host).Do(context.Background())
	if err != nil {
		panic(err)
	}
	fmt.Printf("Elasticsearch returned with code %d and version %s\n", code, info.Version.Number)

	esVersion, err := client.ElasticsearchVersion(host)
	if err != nil {
		panic(err)
	}
	fmt.Printf("Elasticsearch version %s\n", esVersion)
	es := &Elastic{
		Client: client,
		host: host,
	}
	ElasticPool[name] = es
	return es
}
/*
新建索引
 */
func (self *Elastic)CreateIndex(index, mapping string, logHeader *log.LogFormat) bool {
	// 判断索引是否存在
	exists, err := self.Client.IndexExists(index).Do(context.Background())
	if err != nil {
		log.Errorf(logHeader, "<CreateIndex> some error occurred when check exists, index: %s, err:%s", index, err.Error())
		return false
	}
	if exists {
		log.Infof(logHeader, "<CreateIndex> index:{%s} is already exists", index)
		return true
	}
	createIndex, err := self.Client.CreateIndex(index).Body(mapping).Do(context.Background())
	if err != nil {
		log.Errorf(logHeader, "<CreateIndex> some error occurred when create. index: %s, err:%s", index, err.Error())
		return false
	}
	if !createIndex.Acknowledged {
		// Not acknowledged
		log.Errorf(logHeader, "<CreateIndex> Not acknowledged, index: %s", index)
		return false
	}
	return true
}
/*
删除索引
 */
func (self *Elastic)DelIndex(index string, logHeader *log.LogFormat) bool {
	// Delete an index.
	deleteIndex, err := self.Client.DeleteIndex(index).Do(context.Background())
	if err != nil {
		// Handle error
		log.Errorf(logHeader, "<DelIndex> some error occurred when delete. index: %s, err:%s", index, err.Error())
		return false
	}
	if !deleteIndex.Acknowledged {
		// Not acknowledged
		log.Errorf(logHeader, "<DelIndex> acknowledged. index: %s", index)
		return false
	}
	return true
}
/*
数据存储
 */
func (self *Elastic)Put(index, typ, id, bodyJson string, logHeader *log.LogFormat) bool {
	put, err := self.Client.Index().
		Index(index).
		Type(typ).
		Id(id).
		BodyJson(bodyJson).
		Do(context.Background())
	if err != nil {
		// Handle error
		log.Errorf(logHeader, "<Put> some error occurred when put.  err:%s", err.Error())
		return false
	}
	log.Infof(logHeader, "<Put> success, id: %s to index: %s, type %s\n", put.Id, put.Index, put.Type)
	return true
}
/*
数据删除
*/
func (self *Elastic)Del(index, typ, id string, logHeader *log.LogFormat) bool {
	del, err := self.Client.Delete().
		Index(index).
		Type(typ).
		Id(id).
		Do(context.Background())
	if err != nil {
		// Handle error
		log.Errorf(logHeader, "<Del> some error occurred when put.  err:%s", err.Error())
		return false
	}
	log.Infof(logHeader, "<Del> success, id: %s to index: %s, type %s\n", del.Id, del.Index, del.Type)
	return true
}
/*
更新数据
 */
func (self *Elastic)Update(index, typ, id string, updateMap map[string]interface{}, logHeader *log.LogFormat) bool {
	res, err := self.Client.Update().
		Index(index).Type(typ).Id(id).
		Doc(updateMap).
		FetchSource(true).
		Do(context.Background())
	if err != nil {
		log.Errorf(logHeader, "<Update> some error occurred when update. index:%s, typ:%s, id:%s err:%s", index, typ, id, err.Error())
		return false
	}
	if res == nil {
		log.Errorf(logHeader, "<Update> expected response != nil. index:%s, typ:%s, id:%s", index, typ, id)
		return false
	}
	if res.GetResult == nil {
		log.Errorf(logHeader, "<Update> expected GetResult != nil. index:%s, typ:%s, id:%s", index, typ, id)
		return false
	}
	data, _ := json.Marshal(res.GetResult.Source)
	log.Errorf(logHeader, "<Update> update success. data:%s", data)
	return true
}

func (self *Elastic)TermQueryMap(index, typ string, term *elastic.TermQuery, start, end int, logHeader *log.LogFormat) map[string]interface{} {
	//elastic.NewTermQuery()
	searchResult, err := self.Client.Search().
		Index(index).
		Type(typ).
		Query(term).        // specify the query
		From(start).Size(end).        // take documents start-end
		Pretty(true).            // pretty print request and response JSON
		DoString(context.Background()) // execute
	if err != nil {
		fmt.Println(err.Error())
		// Handle error
		log.Errorf(logHeader, "<TermQuery> some error occurred when search. index:%s, term:%v,  err:%s", index, term, err.Error())
		return make(map[string]interface{})
	}
	return util.JsonToMap(searchResult)
}

func (self *Elastic)TermQuery(index, typ string, term *elastic.TermQuery, start, end int, logHeader *log.LogFormat) *elastic.SearchResult {
	//elastic.NewTermQuery()
	searchResult, err := self.Client.Search().
		Index(index).
		Type(typ).
		Query(term).        // specify the query
		From(start).Size(end).        // take documents start-end
		Pretty(true).            // pretty print request and response JSON
		Do(context.Background()) // execute
	if err != nil {
		fmt.Println(err.Error())
		// Handle error
		log.Errorf(logHeader, "<TermQuery> some error occurred when search. index:%s, term:%v,  err:%s", index, term, err.Error())
		return nil
	}
	return searchResult
}

func (self *Elastic)QueryStringMap(index, typ, query string, start,end int, header *log.LogFormat) map[string]interface{} {
	q := elastic.NewQueryStringQuery(query)
	// Match all should return all documents
	searchResult, err := self.Client.Search().
		Index(index).
		Type(typ).    // type of Index
		Query(q).
		Pretty(true).
		From(start).Size(end).
		DoString(context.Background())
	if err != nil {
		fmt.Println(err.Error())
		// Handle error
		log.Errorf(header, "<QueryString> some error occurred when search. index:%s, query:%v,  err:%s", index, query, err.Error())
		return nil
	}
	return util.JsonToMap(searchResult)
}

// https://www.elastic.co/guide/en/elasticsearch/reference/6.8/query-dsl-query-string-query.html
func (self *Elastic)QueryString(index, typ, query string, size int, header *log.LogFormat) *elastic.SearchResult {

	q := elastic.NewQueryStringQuery(query)
	// Match all should return all documents
	searchResult, err := self.Client.Search().
		Index(index).
		Type(typ).    // type of Index
		Query(q).
		Pretty(true).
		From(0).Size(size).
		Do(context.Background())
	if err != nil {
		fmt.Println(err.Error())
		// Handle error
		log.Errorf(header, "<QueryString> some error occurred when search. index:%s, query:%v,  err:%s", index, query, err.Error())
		return nil
	}
	return searchResult
}

/*
多条件参考:https://stackoverflow.com/questions/49942373/golang-elasticsearch-multiple-query-parameters
 */
func (self *Elastic)MultiMatchQueryBestFields(index, typ, text string, start,end int,logHeader *log.LogFormat,fields ...string) *elastic.SearchResult {
	q := elastic.NewMultiMatchQuery(text, fields...)
	searchResult, err := self.Client.Search().
		Index(index). // name of Index
		Type(typ).    // type of Index
		Query(q).
		Sort("_score", false).
		From(start).Size(end).
		Do(context.Background())
	if err != nil {
		// Handle error
		log.Errorf(logHeader, "<MultiMatchQueryBestFields> some error occurred when search. index:%s, text:%v,  err:%s", index, text, err.Error())
		return nil
	}
	return searchResult
}

func (self *Elastic)MultiMatchQueryBestFieldsMap(index, typ, text string, start,end int,logHeader *log.LogFormat,fields ...string) map[string]interface{} {
	q := elastic.NewMultiMatchQuery(text, fields...)
	searchResult, err := self.Client.Search().
		Index(index). // name of Index
		Type(typ).    // type of Index
		Query(q).
		Sort("_score", false).
		From(start).Size(end).
		DoString(context.Background())
	if err != nil {
		// Handle error
		log.Errorf(logHeader, "<MultiMatchQueryBestFields> some error occurred when search. index:%s, text:%v,  err:%s", index, text, err.Error())
		return nil
	}
	return util.JsonToMap(searchResult)
}

func (self *Elastic)QueryStringRandomSearch(client *elastic.Client, index, typ, query string, size int, header *log.LogFormat) *elastic.SearchResult {
	q := elastic.NewFunctionScoreQuery()
	queryString := elastic.NewQueryStringQuery(query)
	q = q.Query(queryString)
	q = q.AddScoreFunc(elastic.NewRandomFunction())
	searchResult, err := client.Search().
		Index(index).
		Type(typ).
		Query(q).
		Size(size).
		Do(context.Background())
	if err != nil {
		// Handle error
		log.Errorf(header, "<QueryStringRandomSearch> some error occurred when search. index:%s, query:%v,  err:%s", index, query, err.Error())
		return nil
	}
	return searchResult
}

func (self *Elastic)RangeQueryLoginDate(index string, typ string, start,end int, logHeader *log.LogFormat) *elastic.SearchResult {
	q := elastic.NewRangeQuery("latest_time").
		Gte("now-30d/d")
	searchResult, err := self.Client.Search().
		Index(index).
		Type(typ).
		Query(q).
		Sort("latest_time", false).
		From(start).Size(end).
		Do(context.Background())
	if err != nil {
		log.Errorf(logHeader, "<RangeQueryLoginDate> some error occurred when search. index:%s,err:%s", index, err.Error())
		return nil
	}
	return searchResult
}

func (self *Elastic)JsonMap(index,typ,query string, fields []string, from, size int,
	terms map[string]interface{}, mustNot, filter,sort []map[string]interface{}, logHeader *log.LogFormat) map[string]interface{}{
	data := make(map[string]interface{})

	var must []map[string]interface{}
	if len(terms) != 0 {
		must = append(must, map[string]interface{}{
			"terms":terms,
		})
	}
	if query != "" {
		multiMatch := make(map[string]interface{})
		multiMatch["query"] = "小龙虾批发"
		if len(fields) != 0 {
			multiMatch["fields"] = fields
		}
		must = append(must, map[string]interface{}{
			"multi_match":multiMatch,
		})
	}
	data["from"] = from
	data["size"] = size
	data["sort"] = sort
	data["query"] = map[string]interface{}{
		"bool": map[string]interface{}{
			"must": must,
			"must_not":mustNot,
			"filter":filter,
		},
	}

	byteDates, err := json.Marshal(data)
	util.Must(err)
	reader := bytes.NewReader(byteDates)

	client := &http.Client{}
	url := self.host + "/" + index + "/" + typ + "/_search"
	req, err := http.NewRequest("POST", url, reader)
	util.Must(err)

	req.Header.Add("content-type", "application/json")

	resp, err := client.Do(req)
	util.Must(err)
	defer resp.Body.Close()

	b, err := ioutil.ReadAll(resp.Body)
	util.Must(err)

	ret := util.JsonToMap(string(b))
	if ret["error"] != nil {
		fmt.Println(fmt.Sprintf("%s", string(b)))
		log.Errorf(logHeader, fmt.Sprintf("%s", string(b)) )
		panic("es error")
		return nil
	}

	return ret
}

测试代码:

package main

import (
	"fmt"
	"github.com/olivere/elastic"
	"why/es"
	"why/log"
)

type Res struct {
	name 	string
	age 	int
	gender 	string
}

func TermMap(esConn *es.Elastic, index, typ string, logFormat *log.LogFormat){
	searchResult := esConn.TermQueryMap(index, typ, elastic.NewTermQuery("name", "why"), 0,10, logFormat)
	fmt.Println(searchResult)
}

func StringMap(esConn *es.Elastic, index, typ string, logFormat *log.LogFormat){
	query := "name:why OR age:19"
	searchResult := esConn.QueryStringMap(index, typ, query, 0,10, logFormat)
	fmt.Println(searchResult)
}

func MultiMatchQueryBestFieldsMap(esConn *es.Elastic, index, typ string, logFormat *log.LogFormat){
	query := "why"
	searchResult := esConn.MultiMatchQueryBestFieldsMap(index, typ, query, 0,10, logFormat, "name", "desc")
	fmt.Println(searchResult)
}

func JsonMap(esConn *es.Elastic, index,typ string, logFormat *log.LogFormat){
	terms := make(map[string]interface{})
	terms["type"] = []interface{}{
		"hangqing_analyze",
		"hangqing",
		"hangqing_desc",
		"scdm",
	}

	var mustNot []map[string]interface{}
	mustNot = append(mustNot, map[string]interface{}{
		"term": map[string]interface{}{
			"img_count": 0,
		},
	})

	var filter []map[string]interface{}
	filter = append(filter, map[string]interface{}{
		"range": map[string]interface{}{
			"video_count": map[string]interface{}{
				"lte":1,
			},
		},
	})

	var sort []map[string]interface{}
	sort = append(sort, map[string]interface{}{
		"_score": map[string]interface{}{
			"order": "desc",
		},
	})

	res := esConn.JsonMap(index, typ, "小龙虾批发", []string{"content", "tags"},0, 3, terms, mustNot, filter, sort, logFormat )
	fmt.Println(res)
}

func main(){
	index := "why_index"
	typ   := "why_type"
	logFormat := new(log.LogFormat)
	esConn := es.InitES("default")

	TermMap(esConn, index, typ, logFormat)
	StringMap(esConn, index, typ, logFormat)
	MultiMatchQueryBestFieldsMap(esConn, index, typ, logFormat)
	JsonMap(esConn, index, typ, logFormat)
}

util工具库参考:

https://blog.csdn.net/why444216978/article/details/103992579

config库参考:

https://blog.csdn.net/why444216978/article/details/103978355

es搭建和操作参考:

https://blog.csdn.net/why444216978/article/details/102158738

https://blog.csdn.net/why444216978/article/details/103681385

 类似资料: