ElasticSearch(五)------基于JAVA的elasticsearch的实例

文凯康
2023-12-01

添加依赖

<!-- Elasticsearch核心依赖包 -->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>transport</artifactId>
            <version>6.2.2</version>
        </dependency>
    	 <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
           <version>6.2.2</version>
        </dependency> 
        <dependency>
	        <groupId>org.elasticsearch.plugin</groupId>
			<artifactId>transport-netty4-client</artifactId>
		    <version>6.2.2</version>
	   </dependency>
 
		<dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.0</version>
        </dependency>

添加es核心工具类

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Map;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.admin.indices.exists.types.TypesExistsRequest;
import org.elasticsearch.action.admin.indices.exists.types.TypesExistsResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import com.google.gson.JsonObject;

/**
 * elasticsearch 6.2.2  工具类
 *
 */
public class ElasticSearcUtil  {

    //TransportClient对象,用于连接ES集群
    private static volatile TransportClient client;
    
	public static TransportClient getClient(){
    	
    	boolean sniff = true;//开启客户端去嗅探整个集群的状态,默认关闭false
		String cluster_name = "elasticsearch";//默认是elasticsearch
		String ip = "127.0.0.1";//本地是127.0.0.1,如果按照到服务器上,填写服务器ip
		int port = 9300;//连接端口号,默认9300
    	
    	//构建Settings对象
    	Settings settings = Settings.builder().put("cluster.name", cluster_name).put("client.transport.sniff", sniff).build();//增加嗅探机制,找到ES集群
    	
    	//双检锁/双重校验锁
        if(client==null){
            synchronized (TransportClient.class){
                try {
                	if(client==null) {
                		client=new PreBuiltTransportClient(settings).addTransportAddress(new TransportAddress(InetAddress.getByName(ip), port));
                	}
                } catch (UnknownHostException e) {
                    e.printStackTrace();
                }
            }
        }
        return client;
    }
    
  /**
	 * 关闭客户端
	 * 
	 * @param client
	 */
	public static void close() {
		if(null != client) {
			 Logger.info("执行关闭连接操作...");
		     client.close();
	        }
	}
	
    /**
     * 获取索引管理的IndicesAdminClient
     */
    public static IndicesAdminClient getAdminClient() {
        return getClient().admin().indices();
    }
    
    /**
     * 判定索引是否存在
     * @param indexName 索引名
     * @return
     */
    public static boolean isExists(String indexName){
        IndicesExistsResponse response=getAdminClient().prepareExists(indexName).get();
        return response.isExists()?true:false;
    }
    
    /**
     * 删除索引
     * @param indexName 索引名
     * @return
     */
    public static boolean deleteIndex(String indexName) {
        DeleteIndexResponse deleteResponse = getAdminClient()
                .prepareDelete(indexName.toLowerCase())
                .execute()
                .actionGet();
        return deleteResponse.isAcknowledged()?true:false;
    }
    
    /**
	 * 判断指定的索引的类型是否存在
	 * @param client
     * @param indexName 索引名
     * @param type index类型
	 * @return 存在:true; 不存在:false;
	 */
	public static boolean isExistsType(String indexName,String type){
	    TypesExistsResponse  response = getAdminClient().typesExists(new TypesExistsRequest(new String[]{indexName}, type)).actionGet();
	    return response.isExists()?true:false;
	}
    
    /**
     * 创建索引
     * @param indexName 索引名
     * @return
     */
    public static boolean createIndex(String indexName){
        CreateIndexResponse createIndexResponse = getAdminClient().prepareCreate(indexName.toLowerCase()).get();
        return createIndexResponse.isAcknowledged()?true:false;
    }
    
    /**
     * 创建索引
     * @param indexName 索引名
     * @param shards   分片数
     * @param replicas  副本数
     * @return
     */
    public static boolean createIndex(String indexName, int shards, int replicas) {
        Settings settings = Settings.builder()
                .put("index.number_of_shards", shards)
                .put("index.number_of_replicas", replicas)
                .build();
        CreateIndexResponse createIndexResponse = getAdminClient()
                .prepareCreate(indexName.toLowerCase())
                .setSettings(settings)
                .execute().actionGet();
        return createIndexResponse.isAcknowledged()?true:false;
    }
    
    /**
     * 为索引indexName设置mapping
     * @param indexName 索引名
     * @param type index类型
     * @param builder
     */
    public static void setMapping(String indexName, String type, XContentBuilder builder) {
        PutMappingRequest mapping = Requests.putMappingRequest(indexName).type(type).source(builder);
        getAdminClient().putMapping(mapping).actionGet();
    }
    
    /**
     * 插入文档
     * @param indexName 索引名
     * @param type index类型
     * @param doc XContentBuilder
     */
    public static void insertDocument(String indexName, String type,XContentBuilder doc) {
        IndexResponse response = getClient().prepareIndex(indexName, type)
                .setSource(doc)
                .get();
        Logger.info("索引名称:" + response.getIndex() + "\n类型:" + response.getType()+"\n文档ID:" + response.getId() + "\n当前实例状态:" + response.status());
    }
    
    /**
     * 插入文档
     * @param indexName 索引名
     * @param type index类型
     * @param id 文档id
     * @param pipeline 管道名称
     * @param doc XContentBuilder
     */
    public static void insertDocument(String indexName, String type,String id, XContentBuilder doc) {
        IndexResponse response = getClient().prepareIndex(indexName, type,id)
                .setSource(doc)
                .get();
        Logger.info("索引名称:" + response.getIndex() + "\n类型:" + response.getType()+"\n文档ID:" + response.getId() + "\n当前实例状态:" + response.status());
    }
    
    /**
     * 插入文档
     * @param indexName 索引名
     * @param type index类型
     * @param id 文档id
     * @param pipeline 管道名称
     * @param doc XContentBuilder
     */
    public static void insertDocument(String indexName, String type,String id,String pipeline, XContentBuilder doc) {
        IndexResponse response = getClient().prepareIndex(indexName, type,id).setPipeline(pipeline)
                .setSource(doc)
                .get();
        Logger.info("索引名称:" + response.getIndex() + "\n类型:" + response.getType()+"\n文档ID:" + response.getId() + "\n当前实例状态:" + response.status());
    }
    
    /**
     * 
     * @param indexName 索引名
     * @param type index类型
     * @param json Json格式串
     */
    public static void insertDocument(String indexName, String type,String json) {
        IndexResponse response = getClient().prepareIndex(indexName, type)
                .setSource(json, XContentType.JSON)
                .get();
        Logger.info("索引名称:" + response.getIndex() + "\n类型:" + response.getType()+"\n文档ID:" + response.getId() + "\n当前实例状态:" + response.status());
    }
    
    /**
     * 
     * @param indexName 索引名
     * @param type index类型
     * @param id 文档id
     * @param pipeline 管道名称
     * @param json Json格式串
     */
    public static void insertDocument(String indexName, String type,String id,String pipeline,String json) {
        IndexResponse response = getClient().prepareIndex(indexName, type,id).setPipeline(pipeline)
                .setSource(json, XContentType.JSON)
                .get();
        Logger.info("索引名称:" + response.getIndex() + "\n类型:" + response.getType()+"\n文档ID:" + response.getId() + "\n当前实例状态:" + response.status());
    }
    
    /**
     * 插入文档
     * @param indexName 索引名
     * @param type index类型
     * @param map
     */
	 public static void insertDocument(String indexName,String type,Map<String, Object> map){
		 IndexResponse response = getClient().prepareIndex(indexName, type)
				 .setSource(map)
				 .get();
		 Logger.info("索引名称:" + response.getIndex() + "\n类型:" + response.getType()+"\n文档ID:" + response.getId() + "\n当前实例状态:" + response.status());
	 }
	 
    /**
     * 插入文档
     * @param indexName 索引名
     * @param type index类型
     * @param id 文档id
     * @param pipeline 管道名称
     * @param map
     */
	 public static void insertDocument(String indexName,String type,String id,String pipeline,Map<String, Object> map){
		 IndexResponse response = getClient().prepareIndex(indexName, type,id).setPipeline(pipeline)
				 .setSource(map)
				 .get();
		 Logger.info("索引名称:" + response.getIndex() + "\n类型:" + response.getType()+"\n文档ID:" + response.getId() + "\n当前实例状态:" + response.status());
	 }	 
	 
	/**
	 * 插入文档
     * @param indexName 索引名
     * @param type index类型
     * @param jsonObject
     */
	 public static void insertDocument(String indexName,String type,JsonObject jsonObject){
		 IndexResponse response = getClient().prepareIndex(indexName, type)
				 .setSource(jsonObject, XContentType.JSON)
				 .get();
		 Logger.info("索引名称:" + response.getIndex() + "\n类型:" + response.getType()+"\n文档ID:" + response.getId() + "\n当前实例状态:" + response.status());
	 }
	 
	/**
	 * 插入文档
     * @param indexName 索引名
     * @param type index类型
     * @param id 文档id
     * @param pipeline 管道名称
     * @param jsonObject
     */
	 public static void insertDocument(String indexName,String type,String id,String pipeline,JsonObject jsonObject){
		 IndexResponse response = getClient().prepareIndex(indexName, type,id).setPipeline(pipeline)
				 .setSource(jsonObject, XContentType.JSON)
				 .get();
		 Logger.info("索引名称:" + response.getIndex() + "\n类型:" + response.getType()+"\n文档ID:" + response.getId() + "\n当前实例状态:" + response.status());
	 }
	 
	/**
	 * 更新索引库文档数据
     * @param indexName 索引名
     * @param type index类型
	 * @param id 文档id
	 * @param map 需要更新的数据
	 */
     public static void updateData(String indexName,String type,String id,Map<String,Object> map) {
    	 UpdateResponse response = getClient().prepareUpdate(indexName, type, id)
    			 .setDoc(map)
    			 .get();
    	 Logger.info("索引名称:" + response.getIndex() + "\n类型:" + response.getType()+"\n文档ID:" + response.getId() + "\n当前实例状态:" + response.status());
     }
     
     /**
	  * 删除索引库文档数据
      * @param indexName 索引名
      * @param type index类型
	  * @param id 文档id
	  */
     public static void deleteData(String indexName,String type,String id) {
    	 DeleteResponse response = getClient().prepareDelete(indexName, type, id)
    			 .get();
    	 Logger.info("索引名称:" + response.getIndex() + "\n类型:" + response.getType()+ "\n文档ID:" + response.getId() + "\n当前实例状态:" + response.status());
     }
     
     /**
	  * 从索引库获取数据
	  * @param client
	  * @param indexName 索引名
	  * @param type index类型
	  * @param id 文档id
      * @return
      */
     public static String getData(String indexName,String type,String id) {
   	    GetResponse getResponse = getClient().prepareGet(indexName, type, id)
   	    		.get();
   	    Logger.info("索引库的数据:" +getResponse.getSourceAsString());
   	    return getResponse.getSourceAsString();
     }

elasticsearch 案例demo

 
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.WritePipelineResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import com.coyee.core.vo.Pager;


/**
 * elasticsearch  档案查询工具类
 *
 */
public class ESUtil {

	public static final String INDEX_NAME="wsbp_mxl";//索引名
	public static final String INDEX_TYPE="archives_mxl";//index类型
	
	public static final String FILE_INDEX_NAME="wsbp_mxl_file";//索引名
	public static final String FILE_INDEX_TYPE="archives_mxl_file";//index类型
	public static final String PIPELINE_NAME="pipeline_mxl";//管道名称
	
	
	/**
	 * 1、创建Index
	 * @param index_name 索引名称
	 */
	public static void createIndex(String index_name) {
		System.out.println(ElasticSearcUtil.createIndex(index_name));
	}
	
	/**
	 * 2、创建Mapping(档案)
	 */
	public static void createMapping() {
		try {
			XContentBuilder builder = jsonBuilder()
                    .startObject()
                    .startObject("properties")
                    .startObject("id") //id字段
                    .field("type", "keyword")
                    .endObject()
                    
                    .startObject("name") //name 名称
                    .field("type", "keyword")
                    .endObject()
                    
                    .startObject("file_type")//类型
                    .field("type", "keyword")
                    .endObject()
                                       
                    .startObject("author")//人
                    .field("type", "keyword")
                    .endObject()
                    
                    .startObject("years")//年
                    .field("type", "keyword")
                    .endObject()                  
                    
                    .startObject("create_time") //创建日期
                    .field("type", "date")
                    .field("format", "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd")
                    .endObject()
                    
                    .startObject("content")  //关键字搜索文本内容
                    .field("type", "text")
                    .field("analyzer", "ik_max_word")
                    .field("search_analyzer", "ik_max_word")
                    .endObject()
                    //.startObject("data")  //文件内容
                    //.field("type", "text")
                    //.field("analyzer", "ik_max_word") //细粒度    ik_smart 粗粒度
                    //.field("search_analyzer", "ik_max_word")
                    //.endObject()
                    
                    .endObject()
                    .endObject();
            ElasticSearcUtil.setMapping(INDEX_NAME, INDEX_TYPE, builder);
        } catch (IOException e) {
            e.printStackTrace();
        }
	}
	
	/**
	 * 2、创建Mapping(文件)
	 */
	public static void createMappingFile() {
		try {
			XContentBuilder builder = jsonBuilder()
                    .startObject()
                    .startObject("properties")
                    .startObject("file_id") //id字段
                    .field("type", "keyword")
                    .endObject()                   
                    
                    .startObject("file_data")  //文件内容
                    .field("type", "text")
                    .field("analyzer", "ik_max_word") //细粒度    ik_smart 粗粒度
                    .field("search_analyzer", "ik_max_word")
                    .endObject()
                    
                    .startObject("archives_id") //档案id
                    .field("type", "keyword")
                    .endObject()
                    
                    .endObject()
                    .endObject();
            ElasticSearcUtil.setMapping(FILE_INDEX_NAME, FILE_INDEX_TYPE, builder);
        } catch (IOException e) {
            e.printStackTrace();
        }
	}
	
	/**
	 * 3、创建pipeline,定义了 2 个预处理器 "attachment" 和 "remove" ,attachment将入库文档字段 "data" 视为文档附件进行文本抽取,"remove" 预处理器用于将其从源文档中删除。
	 * @param pipeline_name 管道名称
	 * @throws InterruptedException
	 * @throws ExecutionException
	 */
/**	public static void createPipeline() throws InterruptedException, ExecutionException{
 
		String source1 =
			    "{\"description\":\"my pipeline information\"," +
			        "\"processors\":[{\"attachment\":{\"field\":\"data\",\"indexed_chars\":-1,\"ignore_missing\":true}},{\"remove\":{\"field\":\"data\"}}]}";
		PutPipelineRequest request = new PutPipelineRequest(
				PIPELINE_NAME,  //管道名称
		    new BytesArray(source1.getBytes(StandardCharsets.UTF_8)), 
		    XContentType.JSON 
		);  
		WritePipelineResponse response = ElasticSearcUtil.getClient().admin().cluster().putPipeline(request).get();
		System.out.println(response.isAcknowledged());
	}
**/	
	/**
	 * 3、创建pipeline
	 * 定义了 2 个预处理器 "attachment" 和 "remove" ,attachment将入库文档字段 "data" 视为文档附件进行文本抽取,"remove" 预处理器用于将其从源文档中删除。
	 * @param pipeline_name 管道名称
	 * @throws InterruptedException
	 * @throws ExecutionException
	 */
	public static void createPipelineFile() throws InterruptedException, ExecutionException{
 
		String source1 =
			    "{\"description\":\"my pipeline information\"," +
			        "\"processors\":[{\"attachment\":{\"field\":\"file_data\",\"indexed_chars\":-1,\"ignore_missing\":true}},{\"remove\":{\"field\":\"file_data\"}}]}";
		PutPipelineRequest request = new PutPipelineRequest(
				PIPELINE_NAME,  //管道名称
		    new BytesArray(source1.getBytes(StandardCharsets.UTF_8)), 
		    XContentType.JSON 
		);  
		WritePipelineResponse response = ElasticSearcUtil.getClient().admin().cluster().putPipeline(request).get();
		System.out.println(response.isAcknowledged());
	}
	
	/**
	 * 插入文档(保存文档数据)
	 * @param entity
	 * @param content 关键字搜索文本内容(json字符串)
	 * @throws IOException
	 */
	public static void insertDocument(Archives entity,String content) throws IOException {
        
		String id = entity.getId(); //文档id
		String name = entity.getName(); //标题
		String file_type =String.valueOf(entity.getFile_type());//文件类型
		String author = entity.getAuthor();// 人
		String years = entity.getYears();//年

		String create_time = entity.getGeneration_time();
		 
		if(StringUtils.isBlank(create_time)) {//日期-格式为yyyy-MM-dd HH:mm:ss||yyyy-MM-dd 不为空
			create_time=DateUtils.format(new Date(), DateUtils.DEFAULT_DATETIME_FORMAT);
		}
		
        XContentBuilder doc = jsonBuilder()
                .startObject()
                .field("id",id)
                .field("name",name)
                .field("file_type",file_type)
                .field("author",author)
                .field("years",years)
                .field("create_time",create_time)
                .field("content",content)
                //.field("data",base64Str)
                .endObject();
        ElasticSearcUtil.insertDocument(INDEX_NAME, INDEX_TYPE,id, doc);
	}
	
	/**
	 * 插入文档(保存档案文件数据)
	 * @param content 关键字搜索文本内容(json字符串)
	 * @param base64Str 文件内容(base64转码后的字符串)
	 * @param archives_id 档案id 
	 * @throws IOException
	 */
	public static void insertDocumentFile(String file_id,String base64Str,String archives_id) throws IOException {
        
        XContentBuilder doc = jsonBuilder()
                .startObject()
                .field("file_id",file_id)
                .field("file_data",base64Str)
                .field("archives_id",archives_id)
                .endObject();
        ElasticSearcUtil.insertDocument(FILE_INDEX_NAME, FILE_INDEX_TYPE,file_id,PIPELINE_NAME, doc);
	}
	
	/**
	 * 删除索引库文档数据
	 * @param index_name 索引名称
	 * @param index_type 索引类型
	 * @param id 文档id
	 */
	public static void deleteData(String index_name,String index_type,String id) {
		ElasticSearcUtil.deleteData(index_name, index_type, id);
	}
 
	/**
     * 组合查询
     * termQuery("key", obj) 完全匹配
     * termsQuery("key", obj1, obj2..)   一次匹配多个值
     * matchQuery("key", Obj) 单个匹配, field不支持通配符, 前缀具高级特性
     * multiMatchQuery("text", "field1", "field2"..);  匹配多个字段, field有通配符忒行
     * matchAllQuery();         匹配所有文件
     * @throws Exception
     */
    public static void searchMutil()throws IOException{
        SearchRequestBuilder srb=ElasticSearcUtil.getClient().prepareSearch(FILE_INDEX_NAME).setTypes(FILE_INDEX_TYPE);
        //QueryBuilder queryBuilder=QueryBuilders.matchQuery("name", "严重、信号差");
        //QueryBuilder queryBuilder2=QueryBuilders.matchPhraseQuery("desc", "设备");
        QueryBuilder queryBuilder3=QueryBuilders.matchQuery("attachment.content", "熊猫");
        SearchResponse sr=srb.setQuery(QueryBuilders.boolQuery()
                //.must(queryBuilder)//文档 必须 匹配这些条件才能被包含进来
                //.mustNot(queryBuilder2)//文档 必须不 匹配这些条件才能被包含进来
                //.must(queryBuilder3)//如果满足这些语句中的任意语句,将增加 _score ,否则,无任何影响。它们主要用于修正每个文档的相关性得分
       		 )
            .execute()
            .actionGet(); 
        SearchHits hits=sr.getHits();
        for(SearchHit hit:hits){
            System.out.println(hit.getSourceAsString());
        }
    }
	
    /**
	* 多个字段的模糊查询,分页不高亮
	* @param keyword 关键字
	* @param pager
	* @param queryMap 查询附加条件
	* @return
	*/
	public static Pager findPagerQueryES(String keyword,Pager pager,Map<String,String> queryMap) {
	   boolean explain = true;
	   try {
	       BoolQueryBuilder query = QueryBuilders.boolQuery();
	       //关键字查询
	       if (StringUtils.isNotBlank(keyword)) {
	    	   query.must(QueryBuilders.boolQuery()
	    	           .should(QueryBuilders.matchQuery("content",keyword )) //分词匹配
	    	           .should(QueryBuilders.matchQuery("attachment.content", keyword)));
	       }
	       
	       String name = MapUtils.getString(queryMap, "name");//名称
	       if(StringUtils.isNotBlank(name)) {
	    	   query.must(QueryBuilders.wildcardQuery("name","*"+name+"*"  ));//模糊匹配
	       }
	       
	       String file_type = MapUtils.getString(queryMap, "file_type");//类型
	       if(StringUtils.isNotBlank(file_type)) {
	    	   query.must(QueryBuilders.termQuery("file_type",file_type  ));//绝对匹配
	       }	       	       
	       
	       String author = MapUtils.getString(queryMap, "author");//立档人
	       if(StringUtils.isNotBlank(author)) {
	    	   query.must(QueryBuilders.wildcardQuery("author","*"+author+"*"  ));//模糊匹配
	       }
	       
	       String years = MapUtils.getString(queryMap, "years");//年
	       if(StringUtils.isNotBlank(years)) {
	    	   query.must(QueryBuilders.termQuery("years",years  )); //绝对匹配
	       }	       	     
	       
	       String sta_time = MapUtils.getString(queryMap, "startTime");//开始日期 
	       if(StringUtils.isNotBlank(sta_time)) {
	    	   query.must(QueryBuilders.rangeQuery("create_time").gte(sta_time));
	       }
	       
	       String end_time = MapUtils.getString(queryMap, "endTime");//结束日期 
	       if(StringUtils.isNotBlank(end_time)) {
	    	   query.must(QueryBuilders.rangeQuery("create_time").lte(end_time));
	       }
	       
	       SearchRequestBuilder srb =ElasticSearcUtil.getClient().prepareSearch(INDEX_NAME)
	               .setSearchType(SearchType.DFS_QUERY_THEN_FETCH) 
	               .setTypes(INDEX_TYPE)
	               .setQuery(query)
	               //分页
	               .setFrom((pager.getPageNumber() - 1) * pager.getPageSize()).setSize(pager.getPageSize())
	               //是否按匹配度排序
	               .setExplain(explain);
	
	       SearchResponse response = srb.execute().actionGet();
	       SearchHits hits = response.getHits();
	       pager.setTotalCount((int) hits.getTotalHits());
	       List<String> list = new ArrayList<>();
	       hits.forEach(searchHit -> {
	           Map<String, Object> source = searchHit.getSourceAsMap();//源数据
	           String resultMap = MapUtils.getString(source, "id"); //返回数据集
	           list.add(resultMap);
	       });
	       
	       pager.setList(list);
	     
	   } catch (Exception e) {
	       e.printStackTrace();
	   }
	   
	   return pager;
	}
    
  /**
	* 多个字段的模糊查询,分页高亮
	* @param keyword 关键字
	* @param pager
	* @param queryMap 查询附加条件
	* @return
	*/
	@SuppressWarnings("unused")
	public static Pager findPagerQueryESHighlight(String keyword,Pager pager,Map<String,Object> queryMap) {
	   boolean highlight = true;
	   boolean explain = true;
	   String sortName = "create_time";
	   String sortOrder = "desc";
	   try {
	       BoolQueryBuilder query = QueryBuilders.boolQuery();
	       //关键字查询
	       if (StringUtils.isNotBlank(keyword)) {
	    	   query.must(QueryBuilders.boolQuery()
	    	           .should(QueryBuilders.matchQuery("content",keyword )) //分词匹配
	    	           .should(QueryBuilders.matchQuery("attachment.content", keyword)));
	       }
	       
	       String name = MapUtils.getString(queryMap, "name");//档案名称
	       if(StringUtils.isNotBlank(name)) {
	    	   query.must(QueryBuilders.wildcardQuery("name","*"+name+"*"  ));//模糊匹配
	       }
	       
	       String file_type = MapUtils.getString(queryMap, "file_type");//类型
	       if(StringUtils.isNotBlank(file_type)) {
	    	   query.must(QueryBuilders.termQuery("file_type",file_type  ));//绝对匹配
	       }
	       
	       String author = MapUtils.getString(queryMap, "author");//人
	       if(StringUtils.isNotBlank(author)) {
	    	   query.must(QueryBuilders.wildcardQuery("author","*"+author+"*"  ));//模糊匹配
	       }
	       
	       String years = MapUtils.getString(queryMap, "years");//年
	       if(StringUtils.isNotBlank(years)) {
	    	   query.must(QueryBuilders.termQuery("years",years  )); //绝对匹配
	       }
	       	       	       
	       String sta_time = MapUtils.getString(queryMap, "startTime");//开始日期 
	       if(StringUtils.isNotBlank(sta_time)) {
	    	   query.must(QueryBuilders.rangeQuery("create_time").gte(sta_time));
	       }
	       
	       String end_time = MapUtils.getString(queryMap, "endTime");//结束日期 
	       if(StringUtils.isNotBlank(end_time)) {
	    	   query.must(QueryBuilders.rangeQuery("create_time").lte(end_time));
	       }
	       
	       SearchRequestBuilder srb =ElasticSearcUtil.getClient().prepareSearch(INDEX_NAME)
	               .setSearchType(SearchType.DFS_QUERY_THEN_FETCH) 
	               .setTypes(INDEX_TYPE)
	               .setQuery(query)
	               //分页
	               .setFrom((pager.getPageNumber() - 1) * pager.getPageSize()).setSize(pager.getPageSize())
	               //是否按匹配度排序
	               .setExplain(explain);
	
	       if (highlight) {
	           HighlightBuilder highlightBuilder = new HighlightBuilder().field("*").requireFieldMatch(false);
	           highlightBuilder.preTags("<span style=\"color:red\">");
	           highlightBuilder.postTags("</span>");
	           srb.highlighter(highlightBuilder);
	       }
	       /**     if (StringUtils.isNotBlank(sortName)) {
		           String[] sortNames = StringUtils.split(sortName, ",");
		           if ("asc".equalsIgnoreCase(sortOrder)) {
		               for (String s : sortNames) {
		                   srb.addSort(s, SortOrder.ASC);
		               }
		           } else {
		               for (String s : sortNames) {
		                   srb.addSort(s, SortOrder.DESC);
		               }
		           }
		       }
		**/
	       SearchResponse response = srb.execute().actionGet();
	       SearchHits hits = response.getHits();
	       pager.setTotalCount((int) hits.getTotalHits());
	       List<Map<String, Object>> list = new ArrayList<>();
	       hits.forEach(searchHit -> {
	           Map<String, Object> source = searchHit.getSourceAsMap();//源数据
	    	   Map<String, Object> resultMap = new HashMap<String, Object>(); //返回数据集
	           Map<String, HighlightField> highlightFields = searchHit.getHighlightFields();
	           
	           //title高亮
	           HighlightField titleField = highlightFields.get("name");
	           dealHighlightField(titleField, source, resultMap, "name");
	           
	           //create_time
	           resultMap.put("create_time", MapUtils.getString(source, "create_time"));
	           //id
	           resultMap.put("id", MapUtils.getString(source, "id"));
	            
	           list.add(resultMap);
	       });
	       
	       pager.setList(list);
	     
	   } catch (Exception e) {
	       e.printStackTrace();
	   }
	   
	   return pager;
	}
	
	/**
	* 多个字段的模糊查询
	* @param keyword 关键字
	* @param pager
	* @param queryMap 查询附加条件
	* @return
	*/
/**	public static List<String> findIdsQueryES(String keyword,Map<String,String> queryMap) {
	   boolean explain = true;
	   List<String> list = new ArrayList<>();
	   try {
	       BoolQueryBuilder query = QueryBuilders.boolQuery();
	       //关键字查询
	       if (StringUtils.isNotBlank(keyword)) {
	    	   query.must(QueryBuilders.boolQuery()
	    	           .should(QueryBuilders.matchQuery("content",keyword )) //分词匹配
	    	           .should(QueryBuilders.matchQuery("attachment.content", keyword)));
	       }
	       
	       String name = MapUtils.getString(queryMap, "name");//档案名称
	       if(StringUtils.isNotBlank(name)) {
	    	   query.must(QueryBuilders.wildcardQuery("name","*"+name+"*"  ));//模糊匹配
	       }
	       
	       String file_type = MapUtils.getString(queryMap, "file_type");//类型
	       if(StringUtils.isNotBlank(file_type)) {
	    	   query.must(QueryBuilders.termQuery("file_type",file_type  ));//绝对匹配
	       }
	       
	       String author = MapUtils.getString(queryMap, "author");//人
	       if(StringUtils.isNotBlank(author)) {
	    	   query.must(QueryBuilders.wildcardQuery("author","*"+author+"*"  ));//模糊匹配
	       }
	       
	       String years = MapUtils.getString(queryMap, "years");//年
	       if(StringUtils.isNotBlank(years)) {
	    	   query.must(QueryBuilders.termQuery("years",years  )); //绝对匹配
	       }
	       
	       String sta_time = MapUtils.getString(queryMap, "startTime");//开始日期 
	       if(StringUtils.isNotBlank(sta_time)) {
	    	   query.must(QueryBuilders.rangeQuery("create_time").gte(sta_time));
	       }
	       
	       String end_time = MapUtils.getString(queryMap, "endTime");//结束日期 
	       if(StringUtils.isNotBlank(end_time)) {
	    	   query.must(QueryBuilders.rangeQuery("create_time").lte(end_time));
	       }
	       
	       SearchRequestBuilder srb =ElasticSearcUtil.getClient().prepareSearch(INDEX_NAME)
	               .setSearchType(SearchType.DFS_QUERY_THEN_FETCH) 
	               .setTypes(INDEX_TYPE)
	               .setQuery(query)
	               //分页
	              // .setFrom((pager.getPageNumber() - 1) * pager.getPageSize()).setSize(pager.getPageSize())
	               //是否按匹配度排序
	               .setExplain(explain);
	
	       SearchResponse response = srb.execute().actionGet();
	       SearchHits hits = response.getHits();
	      
	       hits.forEach(searchHit -> {
	           Map<String, Object> source = searchHit.getSourceAsMap();//源数据
	           String resultMap = MapUtils.getString(source, "id"); //返回数据集
	           list.add(resultMap);
	       });
	     
	   } catch (Exception e) {
	       e.printStackTrace();
	   }
	   
	   return list;
	}
**/	
	/**
	* 多个字段的模糊查询(查询文档id集合)
	* @param keyword 关键字
	* @param pager
	* @param queryMap 查询附加条件
	* @return
	*/
	public static List<String> findIdsQueryES(String keyword,Map<String,String> queryMap) {
	   boolean explain = true;
	   List<String> list = new ArrayList<>();
	   List<String> ids = new ArrayList<>();
	   //关键字查询
       if (StringUtils.isNotBlank(keyword)) {
    	   SearchRequestBuilder builder =ElasticSearcUtil.getClient().prepareSearch(FILE_INDEX_NAME).setSize(1000)
                   .setSearchType(SearchType.DFS_QUERY_THEN_FETCH) 
                   .setTypes(FILE_INDEX_TYPE)
                   .setQuery(QueryBuilders.boolQuery().should(QueryBuilders.matchQuery("attachment.content", keyword)))
                   //是否按匹配度排序
                   .setExplain(explain);
	       SearchHits searchHits = builder.execute().actionGet().getHits();
	       searchHits.forEach(searchHit -> {
	           Map<String, Object> source = searchHit.getSourceAsMap();//源数据
	           String resultMap = MapUtils.getString(source, "archives_id"); //返回文档id
	           ids.add(resultMap);
	            
	       });
       }
	   try {
	       BoolQueryBuilder query = QueryBuilders.boolQuery();
	       //关键字查询  
	       if (StringUtils.isNotBlank(keyword)) { 
	    	   BoolQueryBuilder query1 = QueryBuilders.boolQuery();
	    	   query1.should(QueryBuilders.matchQuery("content",keyword));
	    	   query1.should(QueryBuilders.wildcardQuery("name","*"+keyword+"*"  ));
	    	   ids.forEach(archives_id -> {
	    		   query1.should(QueryBuilders.matchQuery("id",archives_id));
	    	   });
	    	   query.must(query1) ;
	       }
	       
	       String name = MapUtils.getString(queryMap, "name");//档案名称
	       if(StringUtils.isNotBlank(name)) {
	    	   query.must(QueryBuilders.wildcardQuery("name","*"+name+"*"  ));//模糊匹配
	       }
	       
	       String file_type = MapUtils.getString(queryMap, "file_type");//文件类型
	       if(StringUtils.isNotBlank(file_type)) {
	    	   query.must(QueryBuilders.termQuery("file_type",file_type  ));//绝对匹配
	       }     
	       
	       String author = MapUtils.getString(queryMap, "author");//人
	       if(StringUtils.isNotBlank(author)) {
	    	   query.must(QueryBuilders.wildcardQuery("author","*"+author+"*"  ));//模糊匹配
	       }
	       
	       String years = MapUtils.getString(queryMap, "years");//代
	       if(StringUtils.isNotBlank(years)) {
	    	   query.must(QueryBuilders.termQuery("years",years  )); //绝对匹配
	       }
	       
	       
	       String sta_time = MapUtils.getString(queryMap, "startTime");//开始日期 
	       if(StringUtils.isNotBlank(sta_time)) {
	    	   query.must(QueryBuilders.rangeQuery("create_time").gte(sta_time));
	       }
	       
	       String end_time = MapUtils.getString(queryMap, "endTime");//结束日期 
	       if(StringUtils.isNotBlank(end_time)) {
	    	   query.must(QueryBuilders.rangeQuery("create_time").lte(end_time));
	       }
	       
	       SearchRequestBuilder srb =ElasticSearcUtil.getClient().prepareSearch(INDEX_NAME).setSize(1000)
	               .setSearchType(SearchType.DFS_QUERY_THEN_FETCH) 
	               .setTypes(INDEX_TYPE)
	               .setQuery(query)
	               //分页
	              // .setFrom((pager.getPageNumber() - 1) * pager.getPageSize()).setSize(pager.getPageSize())
	               //是否按匹配度排序
	               .setExplain(explain);
	
	       SearchResponse response = srb.execute().actionGet();
	       SearchHits hits = response.getHits();
	      
	       hits.forEach(searchHit -> {
	           Map<String, Object> source = searchHit.getSourceAsMap();//源数据
	           String resultMap = MapUtils.getString(source, "id"); //返回数据集
	           list.add(resultMap);
	       });
	     
	   } catch (Exception e) {
	       e.printStackTrace();
	   }
	   
	   return list;
	}
	
	/**
	* 多个字段的模糊查询(查询文件id集合)
	* @param archives_id 档案id
	* @return
	*/
	public static List<String> findFileIdsQueryES(String archives_id) {
		 List<String> list = new ArrayList<>();
		 try {
			SearchRequestBuilder srb=ElasticSearcUtil.getClient().prepareSearch(FILE_INDEX_NAME).setTypes(FILE_INDEX_TYPE);
			QueryBuilder queryBuilder3=QueryBuilders.matchQuery("archives_id", archives_id);
	        SearchResponse sr=srb.setQuery(QueryBuilders.boolQuery()
	                .must(queryBuilder3)
	       		 )
	            .execute()
	            .actionGet(); 
	        SearchHits hits=sr.getHits();
	        hits.forEach(searchHit -> {
		           Map<String, Object> source = searchHit.getSourceAsMap();//源数据
		           String resultMap = MapUtils.getString(source, "file_id"); //返回数据集
		           list.add(resultMap);
		       });
		
		 } catch (Exception e) {
		       e.printStackTrace();
		   }
	   return list;
	}
	
	
	/**
	 * 控制高亮
	 * @param highlightField 
	 * @param source 源数据map
	 * @param resultMap 结果数据
	 * @param filed 需要高亮的字段
	 * @return
	 */
	public static void dealHighlightField (HighlightField highlightField,Map<String,Object> source,Map<String,Object> resultMap,String filed) {
		
		if (highlightField != null) {
            Text[] fragments = highlightField.fragments();
            String tmp = "";
            for (Text text : fragments) {
                tmp += text;
            }
            resultMap.put(filed, tmp);
        }else {
     	   resultMap.put(filed,MapUtils.getString(source, filed) );
        }
		
	}
	
	
	/**
	 * 判断日期是否为指定格式字符串
	 * @param dataStr 日期字符串
	 * @param pattern 日期字符串格式yyyy-MM
	 * @return
	 */
	public static boolean dateFormat(String dataStr,String pattern) {
		SimpleDateFormat format=new SimpleDateFormat(pattern);
		boolean dateflag=true;
		try
		{
			format.parse(dataStr);
		} catch (ParseException e)
		{
			dateflag=false;
		} 
		return dateflag;
	}
	
	public static void main(String[] args) throws IOException, InterruptedException, java.util.concurrent.ExecutionException {
		
		 
		
		
/**		
		ElasticSearcUtil.deleteIndex(ESUtil.INDEX_NAME); 
		ElasticSearcUtil.deleteIndex(ESUtil.FILE_INDEX_NAME); 
		ESUtil.createIndex(ESUtil.INDEX_NAME);
		ESUtil.createIndex(ESUtil.FILE_INDEX_NAME);
		ESUtil.createMapping();
		ESUtil.createMappingFile();
		ESUtil.createPipelineFile();
**/		

 
 //插入数据
/**  
		Archives archives = new Archives();
		archives.setId("a1");
		archives.setName("关于XXX的文档1");
		archives.setFile_type("文件");
		archives.setAuthor("发布人");
		archives.setYears("2019年");
		archives.setGeneration_time("2019-08-26");
		insertDocument(archives, "文档介绍等内容");
		insertDocumentFile("f1", FileToBase64Util.fileToBase64("D:\\2.txt"), "a1");
		
		ElasticSearcUtil.getData(ESUtil.INDEX_NAME, ESUtil.INDEX_TYPE, "a1");
		ElasticSearcUtil.getData(ESUtil.FILE_INDEX_NAME, ESUtil.FILE_INDEX_TYPE, "f1");

**/		
 
	}
	
}

base64转换工具类

package com.wisesoft.util.elasticsearch;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.net.URLConnection;
import java.util.Base64;
import org.apache.commons.lang.StringUtils;
import sun.misc.BASE64Encoder;

/**
 * 文件转换为base64编码
 * @author yilei
 *
 */
@SuppressWarnings("restriction")
public class FileToBase64Util {

	 /**
     * 将本地文件转base64字符串
     * @param path 文件路径
     * @return
     */
	public static String fileToBase64(String path) {
        String base64 = null;
        InputStream in = null;
        try {
            File file = new File(path);
            in = new FileInputStream(file);
            byte[] bytes=new byte[(int)file.length()];
            in.read(bytes);
            base64 = Base64.getEncoder().encodeToString(bytes);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (in != null) {
                try {
                    in.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        return base64;
    }
	
	public static String encodeBase64File(String path) throws Exception {
		  File file = new File(path);;
		  FileInputStream inputFile = new FileInputStream(file);
		  byte[] buffer = new byte[(int) file.length()];
		  inputFile.read(buffer);
		  inputFile.close();
		  return new BASE64Encoder().encode(buffer);
		 
		 }
	
	/**
	 * 将网络文件转base64字符串
	 * @param path 文件路径
	 * @return
	 */
	public static String fileToBse64NetWork(String path) {
		// 下载网络文件
		String base64 = null;
		int byteread = 0;
		byte[] totalbyte = new byte[0];
		try {
		URL url = new URL(path);
		URLConnection conn = url.openConnection();
		InputStream inStream = conn.getInputStream();
		byte[] buffer = new byte[1204];
		while ((byteread = inStream.read(buffer)) != -1) {
		//拼接流,这样写是保证文件不会被篡改
		totalbyte = byteMerger(totalbyte,buffer,byteread);
		}
		inStream.close();
		base64=new BASE64Encoder().encode(totalbyte);
		} catch (FileNotFoundException e) {
		e.printStackTrace();
		} catch (IOException e) {
		e.printStackTrace();
		}
		
		return base64;
	}
	
	/**
	 * BASE64解码成File文件
	 * @param destPath 保存路径
	 * @param base64 base64字符串
	 * @param fileName 文件名称+后缀名
	 */
	 public static void base64ToFile(String destPath,String base64, String fileName) {
	     File file = null;
	     //创建文件目录
	     String filePath=destPath;
	     File  dir=new File(filePath);
	     if (!dir.exists() && !dir.isDirectory()) {
	         dir.mkdirs();
	     }
	     BufferedOutputStream bos = null;
	     java.io.FileOutputStream fos = null;
	     try {
	         byte[] bytes = Base64.getDecoder().decode(base64);
	         file=new File(filePath+"/"+fileName);
	         fos = new java.io.FileOutputStream(file);
	         bos = new BufferedOutputStream(fos);
	         bos.write(bytes);
	     } catch (Exception e) {
	         e.printStackTrace();
	     } finally {
	         if (bos != null) {
	             try {
	                 bos.close();
	             } catch (IOException e) {
	                 e.printStackTrace();
	             }
	         }
	         if (fos != null) {
	             try {
	                 fos.close();
	             } catch (IOException e) {
	                 e.printStackTrace();
	             }
	         }
	     }
	 }

	/**
	 * 拼接byte[]类型数据
	 */
	public static byte[] byteMerger(byte[] byte_1, byte[] byte_2,int byteread){
		byte[] byte_3 = new byte[byte_1.length+byteread];
		System.arraycopy(byte_1, 0, byte_3, 0, byte_1.length);
		System.arraycopy(byte_2, 0, byte_3, byte_1.length, byteread);
		return byte_3;
	}
	
	 
	
	public static void main(String[] args) throws Exception {
		 
		
		FileToBase64Util.base64ToFile("D:\\","s8m2vMrQzuS67sf4tPPQ3MOoILaltqW2pbaltqW2pbaltqW2pbaltqW2pbaltqW2pbaltqW2pbal","1.txt");
		//System.out.println(FileToBase64Util.fileToBase64("D:\\1.docx"));
	}

	
	
}


 类似资料: