说明:本次集成环境为 java8 + es 6.7 版本
一,介绍
参考官方文档:开始使用 Elasticsearch | Elastic Videos
Elasticsearch | RDBMS |
---|---|
Index(索引) | DataBase(数据库) |
Type(类型) | Table(表) |
Document(文档) | Row(行) |
Field(字段) | Column(列) |
Mapping(映射) | Schema(约束) |
Everything is indexed(索引) | Index(索引) |
官方参考文档:Java High Level REST Client | Java REST Client [6.8] | Elastic
官方参考文档比较详细:Field datatypes | Elasticsearch Guide [6.7] | Elastic
其他参考:ES的数据类型 (text、keyword、date、object、geo等)
二、es环境安装(linux)
安装包下载: Download Elasticsearch | ElasticDownload Elasticsearch | Elastic
安装参考文档指南:安装并运行 Elasticsearch | Elasticsearch: 权威指南 | Elastic
三、集成
引入相关依赖包:
<properties>
<java.version>1.8</java.version>
<es.version>6.7.2</es.version>
</properties>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>${es.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${es.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${es.version}</version>
</dependency>
配置类:
package com.datahub.aimindgraph.config;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ElasticsearchConfig implements FactoryBean<RestHighLevelClient>, InitializingBean, DisposableBean {
// TODO 此处配置es服务端地址
@Value("${es.servers}")
private String uri;
private RestClient client;
private RestHighLevelClient restHighLevelClient;
protected void initClient() {
if(StringUtils.isEmpty(uri)){
throw new RuntimeException("elasticsearch uri is unset to properties file");
}
String [] nodes = uri.split(",");
HttpHost[] httpHosts = new HttpHost[nodes.length];
for(int x = 0;x<nodes.length;x++){
String [] uris = nodes[x].split(":");
HttpHost httpHost = new HttpHost(uris[0],Integer.parseInt(uris[1]),"http");
httpHosts[x] = httpHost;
}
RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
restHighLevelClient = new RestHighLevelClient(restClientBuilder);
client = restHighLevelClient.getLowLevelClient();
}
@Override
public void destroy() throws Exception {
try {
if (client != null) {
client.close();
}
} catch (final Exception e) {
}
}
@Override
public RestHighLevelClient getObject() throws Exception {
return restHighLevelClient;
}
@Override
public Class<RestHighLevelClient> getObjectType() {
return RestHighLevelClient.class;
}
@Override
public void afterPropertiesSet() throws Exception {
initClient();
}
}
常用方法封装:
package com.datahub.aimindgraph.es.service;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.datahub.aimindgraph.dto.EsResourceDTO;
import com.datahub.aimindgraph.exception.WrapperException;
import com.datahub.aimindgraph.util.PageUtil;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.client.core.CountResponse;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.*;
/**
* @Desc 根据实际业务需求定制
* @Author wadu
* @Date 2021/07/17
* @Version 1.0
**/
@Service
@Slf4j
public class EsResourceUtil {
@Autowired
private RestHighLevelClient client;
private final RequestOptions options = RequestOptions.DEFAULT;
private static final Map<String, String> indexNameMap = new HashMap<>();
private static final String defaultType = "_doc";
public String getIndex(String indexName){
if (indexNameMap.containsKey(indexName)) {
return indexName;
}
if (existsIndex(indexName) || createIndex(indexName)) {
indexNameMap.put(indexName, "");
return indexName;
}
throw new WrapperException("find index failed");
}
/**
* 动态创建索引
*
* @param indexName
*/
public boolean createIndex(String indexName, XContentBuilder xContentBuilder) {
synchronized (indexName.intern()) {
try {
if (!existsIndex(indexName)) {
CreateIndexRequest request = new CreateIndexRequest(indexName);
if (Objects.nonNull(xContentBuilder)) {
request.mapping(xContentBuilder);
}
HashMap<String, Object> settings_map = new HashMap<>(2);
settings_map.put("number_of_shards", 3);
settings_map.put("number_of_replicas", 1);
request.settings(settings_map);
CreateIndexResponse createIndexResponse = client.indices().create(request, options);
return createIndexResponse.isAcknowledged();
}
} catch (IOException e) {
e.printStackTrace();
}
return false;
}
}
/**
* 删除索引
*
* @param indexName
*/
public boolean deleteIndex(String indexName) {
synchronized (indexName.intern()) {
try {
if (existsIndex(indexName)) {
AcknowledgedResponse response = client.indices().delete(new DeleteIndexRequest(indexName), options);
return response.isAcknowledged();
}
} catch (IOException e) {
e.printStackTrace();
}
return false;
}
}
/**
* 判断索引是否存在
*
* @param indexName
* @return
*/
public boolean existsIndex(String indexName) {
try {
GetIndexRequest request = new GetIndexRequest(indexName);
return client.indices().exists(request, options);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 新增或更新
* @param indexName
* @param document
* @return
*/
public Optional<String> addOrUpdate(String indexName, Map<String, Object> document) {
Object id = document.get("id");
if (Objects.isNull(id)) {
throw new WrapperException("id cannot be null");
}
if (exists(indexName, id.toString())) {
return update(indexName, document);
}
return add(indexName, document);
}
/**
* 增加记录
* @param indexName 索引
* @param document
*/
public Optional<String> add(String indexName, Map<String, Object> document) {
try {
indexName = getIndex(indexName);
Object id = document.get("id");
if (Objects.isNull(id)) {
throw new WrapperException("id cannot be null");
}
IndexRequest indexRequest = new IndexRequest(indexName, defaultType, id.toString());
document.remove("id");
indexRequest.opType(DocWriteRequest.OpType.CREATE);
indexRequest.source(document, XContentType.JSON);
IndexResponse response = client.index(indexRequest, options);
return Optional.of(response.getId());
} catch (IOException e) {
e.printStackTrace();
}
return Optional.empty();
}
/**
* 批量增加记录
* @param indexName 索引
* @param documents
*/
public Boolean batchAdd(String indexName, List<Map<String, Object>> documents) {
try {
indexName = getIndex(indexName);
BulkRequest bulkRequest = new BulkRequest();
for (Map<String, Object> document : documents) {
bulkRequest.add(new IndexRequest(indexName, defaultType,document.get("id").toString())
.opType(DocWriteRequest.OpType.CREATE)
.source(document, XContentType.JSON));
}
client.bulk(bulkRequest, options);
return Boolean.TRUE;
} catch (IOException e) {
e.printStackTrace();
}
return Boolean.FALSE;
}
/**
* 更新记录信息
* @param indexName
* @param document
*/
public Optional<String> update(String indexName, Map<String, Object> document) {
try {
Object id = document.get("id");
if (Objects.isNull(id)) {
throw new WrapperException("id cannot be null");
}
UpdateRequest request = new UpdateRequest(indexName, defaultType, id.toString());
document.remove("id");
request.doc(document, XContentType.JSON);
UpdateResponse update = client.update(request, options);
return Optional.of(update.getId());
} catch (IOException e) {
e.printStackTrace();
}
return Optional.empty();
}
/**
* 判断记录是否存在
* @param indexName
* @param id
* @return
*/
public boolean exists(String indexName, String id) {
try {
indexName = getIndex(indexName);
GetRequest getRequest = new GetRequest(indexName, defaultType, id);
getRequest.fetchSourceContext(new FetchSourceContext(false));
getRequest.storedFields("_none_");
return client.exists(getRequest, options);
} catch (IOException e){
e.printStackTrace();
}
return false;
}
/**
* 根据id获取记录信息
* @param indexName
* @param id
*/
public Optional<Map<String, Object>> getById(String indexName, String id) {
try {
GetRequest getRequest = new GetRequest(indexName, defaultType, id);
GetResponse getResponse = client.get(getRequest, options);
if(getResponse.isExists()){
return Optional.of(getResponse.getSource());
}
} catch (IOException e) {
e.printStackTrace();
}
return Optional.empty();
}
/**
* 删除记录
* @param indexName 索引,类似数据库
* @param id 数据ID
*/
public void delete(String indexName, String id) {
try {
DeleteRequest deleteRequest = new DeleteRequest(indexName, defaultType, id);
client.delete(deleteRequest, options);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 搜索 // TODO 内容比较多,见下期
* @param graphName
* @param esResource
* @param pageNo
* @param pageSize
* @throws IOException
*/
public IPage<Map<String, Object>> search(String graphName, EsResourceDTO esResource, int pageNo, int pageSize) {
Map<String, Object> props = esResource.getConditionMap();
Map<String, String> sort = esResource.getOrder();
String label = esResource.getLabel();
String[] returnProperties = esResource.getReturnProperties();
String indexName = String.join("_", graphName, label);
BoolQueryBuilder boolBuilder = this.parseQuery(props, esResource.getEdgeLabelConditionList());
// Long total = this.count(indexName, boolBuilder);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(boolBuilder);
this.parseSort(sourceBuilder, sort);
sourceBuilder.from((pageNo-1) * pageSize);
sourceBuilder.size(pageSize);
if (returnProperties != null && returnProperties.length > 0) {
sourceBuilder.fetchSource(returnProperties, new String[] {});
}
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.types(defaultType);
searchRequest.source(sourceBuilder);
Long totalHits = 0L;
List<Map<String, Object>> results = new ArrayList<>();
try {
SearchResponse response = client.search(searchRequest, options);
SearchHits hits = response.getHits();
totalHits = hits.getTotalHits();
SearchHit[] searchHits = hits.getHits();
for (SearchHit hit : searchHits) {
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
sourceAsMap.put("id", hit.getId());
results.add(sourceAsMap);
}
} catch (IOException e) {
e.printStackTrace();
}
return PageUtil.page(results, totalHits.intValue(), pageNo, pageSize);
}
/**
* 查询总数
*/
public Long count(String indexName, BoolQueryBuilder queryBuilder) {
try {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(queryBuilder);
CountRequest countRequest = new CountRequest(indexName);
countRequest.source(searchSourceBuilder);
CountResponse countResponse = client.count(countRequest, options);
return countResponse.getCount();
} catch (Exception e) {
e.printStackTrace();
}
return 0L;
}
}
测试类:
package com.datahub.aimindgraph.unit;
import com.datahub.aimindgraph.AiMindGraphRestServer;
import com.datahub.aimindgraph.es.service.EsResourceUtil;
import com.datahub.aimindgraph.util.UUIDUtils;
import com.google.common.collect.Lists;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.HashMap;
import java.util.Map;
/**
* @Desc TODO
* @Author wadu
* @Date 2021/10/18
* @Version 1.0
**/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = AiMindGraphRestServer.class)
@ActiveProfiles("dev")
public class EsTest {
@Autowired
private EsResourceUtil esResourceService;
@Test
public void test() {
String graphName = "jiaokaokgfinalsix";
String label = "st";
String indexName = graphName + "_" + label;
//删除索引
esResourceService.deleteIndex(indexName);
//创建索引
esResourceService.createIndex(indexName);
// HugeClient client = graphService.client(graphName);
// GraphManager graph = client.graph();
// Map<String, Object> hashMap = new HashMap<>();
// hashMap.put("identifier", "st1553");
// int page = 100;
// for (int i = 1, len = 24860/page + 1; i < len; i++) {
// List<Vertex> vertices = graph.listVertices(label, null, i*page, page);
// for (Vertex v : vertices) {
// vertexAdditionalService.afterAppend(graph, v);
// }
// }
//插入
Map<String, Object> p = new HashMap<>();
p.put("id", UUIDUtils.getUUIDString());
p.put("name", "测试1");
p.put("identifier", "TEST1");
Map<String, Object> pList1 = new HashMap<>();
pList1.put("group", 1);
pList1.put("sort", 0);
pList1.put("name", "天气");
pList1.put("identifier", "kcd00011");
Map<String, Object> pList2 = new HashMap<>();
pList1.put("group", 1);
pList1.put("sort", 1);
pList1.put("name", "食物");
pList1.put("identifier", "kcd00146");
p.put("investigationPoint", Lists.newArrayList(pList1, pList2));
esResourceService.add(indexName, p);
//查询
// EsResourceDTO esResource = new EsResourceDTO();
// IPage<VertexDto> search = esResourceService.search(graphName, true, esResource, 1, 100);
// System.out.println(search.getRecords());
}
}
下一篇:es 查询 (基本查询、嵌套、聚合查询等)
相关参考博文:Elasticsearch学习总结