SpringBoot集成elasticsearch 总结

苗阳
2023-12-01

说明:本次集成环境为 java8 + es 6.7 版本

一,介绍

参考官方文档:开始使用 Elasticsearch | Elastic Videos

  • 基本概念与关系型数据库的对应、便与理解:
ElasticsearchRDBMS
Index(索引) DataBase(数据库)
Type(类型)Table(表)
Document(文档)Row(行)
Field(字段)Column(列)
Mapping(映射)Schema(约束)
Everything is indexed(索引)Index(索引)
  • API:

官方参考文档:Java High Level REST Client | Java REST Client [6.8] | Elastic

  •  数据类型:

官方参考文档比较详细:Field datatypes | Elasticsearch Guide [6.7] | Elastic

其他参考:ES的数据类型 (text、keyword、date、object、geo等)

  

二、es环境安装(linux)

       安装包下载: Download Elasticsearch | ElasticDownload Elasticsearch | Elastic  

       安装参考文档指南:安装并运行 Elasticsearch | Elasticsearch: 权威指南 | Elastic 

三、集成

 引入相关依赖包:

<properties>
        <java.version>1.8</java.version>
        <es.version>6.7.2</es.version>
    </properties>
<dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
            <version>${es.version}</version>
        </dependency>

        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>${es.version}</version>
        </dependency>

        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>${es.version}</version>
        </dependency>

配置类:

package com.datahub.aimindgraph.config;

import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ElasticsearchConfig implements FactoryBean<RestHighLevelClient>, InitializingBean, DisposableBean {
    
    // TODO 此处配置es服务端地址
    @Value("${es.servers}")
    private String uri;

    private RestClient client;
    private RestHighLevelClient restHighLevelClient;


    protected void initClient() {
        if(StringUtils.isEmpty(uri)){
            throw new RuntimeException("elasticsearch uri is unset to properties file");
        }
        String [] nodes = uri.split(",");
        HttpHost[] httpHosts = new HttpHost[nodes.length];
        for(int x = 0;x<nodes.length;x++){
            String [] uris = nodes[x].split(":");
            HttpHost httpHost = new HttpHost(uris[0],Integer.parseInt(uris[1]),"http");
            httpHosts[x] = httpHost;
        }
        RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
        restHighLevelClient = new RestHighLevelClient(restClientBuilder);
        client = restHighLevelClient.getLowLevelClient();

    }

    @Override
    public void destroy() throws Exception {
        try {
            if (client != null) {
                client.close();
            }
        } catch (final Exception e) {
        }
    }

    @Override
    public RestHighLevelClient getObject() throws Exception {
        return restHighLevelClient;
    }

    @Override
    public Class<RestHighLevelClient> getObjectType() {
        return RestHighLevelClient.class;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        initClient();
    }
}

常用方法封装:

package com.datahub.aimindgraph.es.service;

import com.baomidou.mybatisplus.core.metadata.IPage;
import com.datahub.aimindgraph.dto.EsResourceDTO;
import com.datahub.aimindgraph.exception.WrapperException;
import com.datahub.aimindgraph.util.PageUtil;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.client.core.CountResponse;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.*;

/**
 * @Desc 根据实际业务需求定制
 * @Author wadu
 * @Date 2021/07/17
 * @Version 1.0
 **/
@Service
@Slf4j
public class EsResourceUtil {

    @Autowired
    private RestHighLevelClient client;

    private final RequestOptions options = RequestOptions.DEFAULT;

    private static final Map<String, String> indexNameMap = new HashMap<>();

    private static final String defaultType = "_doc";

    public String getIndex(String indexName){
        if (indexNameMap.containsKey(indexName)) {
            return indexName;
        }
        if (existsIndex(indexName) || createIndex(indexName)) {
            indexNameMap.put(indexName, "");
            return indexName;
        }
        throw new WrapperException("find index failed");
    }

    /**
     * 动态创建索引
     *
     * @param indexName
     */
    public boolean createIndex(String indexName, XContentBuilder xContentBuilder) {
        synchronized (indexName.intern()) {
            try {
                if (!existsIndex(indexName)) {
                    CreateIndexRequest request = new CreateIndexRequest(indexName);
                    if (Objects.nonNull(xContentBuilder)) {
                        request.mapping(xContentBuilder);
                    }
                    HashMap<String, Object> settings_map = new HashMap<>(2);
                    settings_map.put("number_of_shards", 3);
                    settings_map.put("number_of_replicas", 1);
                    request.settings(settings_map);
                    CreateIndexResponse createIndexResponse = client.indices().create(request, options);
                    return createIndexResponse.isAcknowledged();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            return false;
        }
    }

    /**
     * 删除索引
     *
     * @param indexName
     */
    public boolean deleteIndex(String indexName) {
        synchronized (indexName.intern()) {
            try {
                if (existsIndex(indexName)) {
                    AcknowledgedResponse response = client.indices().delete(new DeleteIndexRequest(indexName), options);
                    return response.isAcknowledged();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            return false;
        }
    }

    /**
     * 判断索引是否存在
     *
     * @param indexName
     * @return
     */
    public boolean existsIndex(String indexName) {
        try {
            GetIndexRequest request = new GetIndexRequest(indexName);
            return client.indices().exists(request, options);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 新增或更新
     * @param indexName
     * @param document
     * @return
     */
    public Optional<String> addOrUpdate(String indexName, Map<String, Object> document) {
        Object id = document.get("id");
        if (Objects.isNull(id)) {
            throw new WrapperException("id cannot be null");
        }
        if (exists(indexName, id.toString())) {
            return update(indexName, document);
        }
        return add(indexName, document);
    }

        /**
         * 增加记录
         * @param indexName 索引
         * @param document
         */
    public Optional<String> add(String indexName, Map<String, Object> document) {
        try {
            indexName = getIndex(indexName);
            Object id = document.get("id");
            if (Objects.isNull(id)) {
                throw new WrapperException("id cannot be null");
            }
            IndexRequest indexRequest = new IndexRequest(indexName, defaultType, id.toString());
            document.remove("id");
            indexRequest.opType(DocWriteRequest.OpType.CREATE);
            indexRequest.source(document, XContentType.JSON);
            IndexResponse response = client.index(indexRequest, options);
            return Optional.of(response.getId());
        } catch (IOException e) {
            e.printStackTrace();
        }
        return Optional.empty();
    }

        /**
         * 批量增加记录
         * @param indexName 索引
         * @param documents
         */
    public Boolean batchAdd(String indexName, List<Map<String, Object>> documents) {
        try {
            indexName = getIndex(indexName);
            BulkRequest bulkRequest = new BulkRequest();
            for (Map<String, Object> document : documents) {
                bulkRequest.add(new IndexRequest(indexName, defaultType,document.get("id").toString())
                        .opType(DocWriteRequest.OpType.CREATE)
                        .source(document, XContentType.JSON));
            }
            client.bulk(bulkRequest, options);
            return Boolean.TRUE;
        } catch (IOException e) {
            e.printStackTrace();
        }
        return Boolean.FALSE;
    }
    /**
     * 更新记录信息
     * @param indexName
     * @param document
     */
    public Optional<String> update(String indexName, Map<String, Object> document) {
        try {
            Object id = document.get("id");
            if (Objects.isNull(id)) {
                throw new WrapperException("id cannot be null");
            }
            UpdateRequest request = new UpdateRequest(indexName, defaultType, id.toString());
            document.remove("id");
            request.doc(document, XContentType.JSON);
            UpdateResponse update = client.update(request, options);
            return Optional.of(update.getId());
        } catch (IOException e) {
            e.printStackTrace();
        }
        return Optional.empty();
    }

    /**
     * 判断记录是否存在
     * @param indexName
     * @param id
     * @return
     */
    public boolean exists(String indexName, String id) {
        try {
            indexName = getIndex(indexName);
            GetRequest getRequest = new GetRequest(indexName, defaultType, id);
            getRequest.fetchSourceContext(new FetchSourceContext(false));
            getRequest.storedFields("_none_");
            return client.exists(getRequest, options);
        } catch (IOException e){
            e.printStackTrace();
        }
        return false;
    }


    /**
     * 根据id获取记录信息
     * @param indexName
     * @param id
     */
    public Optional<Map<String, Object>> getById(String indexName, String id) {
        try {
            GetRequest getRequest = new GetRequest(indexName, defaultType, id);
            GetResponse getResponse = client.get(getRequest, options);
            if(getResponse.isExists()){
                return Optional.of(getResponse.getSource());
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
       return Optional.empty();
    }



    /**
     * 删除记录
     * @param indexName 索引,类似数据库
     * @param id 数据ID
     */
    public void delete(String indexName, String id) {
        try {
            DeleteRequest deleteRequest = new DeleteRequest(indexName, defaultType, id);
            client.delete(deleteRequest, options);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 搜索 // TODO 内容比较多,见下期
     * @param graphName
     * @param esResource
     * @param pageNo
     * @param pageSize
     * @throws IOException
     */
    public IPage<Map<String, Object>> search(String graphName, EsResourceDTO esResource, int pageNo, int pageSize) {
        Map<String, Object> props = esResource.getConditionMap();
        Map<String, String> sort = esResource.getOrder();
        String label = esResource.getLabel();
        String[] returnProperties = esResource.getReturnProperties();
        String indexName = String.join("_", graphName, label);
        BoolQueryBuilder boolBuilder = this.parseQuery(props, esResource.getEdgeLabelConditionList());
//        Long total = this.count(indexName, boolBuilder);
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.query(boolBuilder);
        this.parseSort(sourceBuilder, sort);
        sourceBuilder.from((pageNo-1) * pageSize);
        sourceBuilder.size(pageSize);

        if (returnProperties != null && returnProperties.length > 0) {
            sourceBuilder.fetchSource(returnProperties, new String[] {});
        }

        SearchRequest searchRequest = new SearchRequest(indexName);
        searchRequest.types(defaultType);
        searchRequest.source(sourceBuilder);
        Long totalHits = 0L;
        List<Map<String, Object>> results = new ArrayList<>();
        try {
            SearchResponse response = client.search(searchRequest, options);
            SearchHits hits = response.getHits();
            totalHits = hits.getTotalHits();
            SearchHit[] searchHits = hits.getHits();
            for (SearchHit hit : searchHits) {
                Map<String, Object> sourceAsMap = hit.getSourceAsMap();
                sourceAsMap.put("id", hit.getId());
                results.add(sourceAsMap);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return PageUtil.page(results, totalHits.intValue(), pageNo, pageSize);
    }

    /**
     * 查询总数
     */
    public Long count(String indexName, BoolQueryBuilder queryBuilder) {
        try {
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.query(queryBuilder);
            CountRequest countRequest = new CountRequest(indexName);
            countRequest.source(searchSourceBuilder);
            CountResponse countResponse = client.count(countRequest, options);
            return countResponse.getCount();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return 0L;
    }
}

测试类:

package com.datahub.aimindgraph.unit;

import com.datahub.aimindgraph.AiMindGraphRestServer;
import com.datahub.aimindgraph.es.service.EsResourceUtil;
import com.datahub.aimindgraph.util.UUIDUtils;
import com.google.common.collect.Lists;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.HashMap;
import java.util.Map;

/**
 * @Desc TODO
 * @Author wadu
 * @Date 2021/10/18
 * @Version 1.0
 **/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = AiMindGraphRestServer.class)
@ActiveProfiles("dev")
public class EsTest {
    @Autowired
    private EsResourceUtil esResourceService;
    @Test
    public void test() {
        String graphName = "jiaokaokgfinalsix";
        String label = "st";
        String indexName = graphName + "_" + label;
        //删除索引
        esResourceService.deleteIndex(indexName);
        //创建索引
        esResourceService.createIndex(indexName);
//        HugeClient client = graphService.client(graphName);
//        GraphManager graph = client.graph();
//        Map<String, Object> hashMap = new HashMap<>();
//        hashMap.put("identifier", "st1553");
//        int page = 100;
//        for (int i = 1, len = 24860/page + 1; i < len; i++) {
//            List<Vertex> vertices = graph.listVertices(label, null, i*page, page);
//            for (Vertex v : vertices) {
//                vertexAdditionalService.afterAppend(graph, v);
//            }
//        }
        //插入
        Map<String, Object> p = new HashMap<>();
        p.put("id", UUIDUtils.getUUIDString());
        p.put("name", "测试1");
        p.put("identifier", "TEST1");

        Map<String, Object> pList1 = new HashMap<>();
        pList1.put("group", 1);
        pList1.put("sort", 0);
        pList1.put("name", "天气");
        pList1.put("identifier", "kcd00011");
        Map<String, Object> pList2 = new HashMap<>();
        pList1.put("group", 1);
        pList1.put("sort", 1);
        pList1.put("name", "食物");
        pList1.put("identifier", "kcd00146");
        p.put("investigationPoint", Lists.newArrayList(pList1, pList2));
        esResourceService.add(indexName, p);
        //查询
//        EsResourceDTO esResource = new EsResourceDTO();
//        IPage<VertexDto> search = esResourceService.search(graphName, true, esResource, 1, 100);
//        System.out.println(search.getRecords());
    }

}

下一篇:es 查询 (基本查询、嵌套、聚合查询等)

相关参考博文:Elasticsearch学习总结

 类似资料: