首先说明本文要实现的内容:
1.比较bboss-elasticsearch和springboot自带的spring-boot-starter-data-elasticsearch优缺点
2.如果实现数据库数据全量、增量的导入到elasticsearch服务器中
3.将excel、word、ppt、pdf等文件解析到elasticsearch服务器中
4.通过json文件生生成mapping
5.通过json文件实现内容检索
6.通过json文件实现文档高亮查询
首先说一下spring-boot-starter-data-elasticsearch。springboot自带,无缝兼容,操作简单,可以像JSP操作数据库一样去操作elasticsearch服务器,但这只是针对简单查询。在实际应用中,并没有太多的简单查询(如果都是简单查询,也没必要使用es吧)。如果是复杂查询,需要大量的代码逻辑,理解难度颇高。
bboss-elasticsearch是国产开源框架,是一套基于query dsl语法操作和访问分布式搜索引擎elasticsearch的o/r mapping高性能开发库,底层基于es restful api,让你像使用mybatis一样操作数据es服务器
个人认为,bboss-elasticsearchs有两点好处,。
1.提供操作数据库的接口,不需要额外配置数据源就可以实现数据的全量、增量导入
2.使用原生的query、dsl语句,友好度更高,方便学习和交流。代码可读性高。
由于本项目使用的是6.2.4版本,所以使用6.0.2版本的jar包,其他es版本参考
Elasticsearch Bboss开发文档
<!-- springboot集成es客户端的jar包-->
<dependency>
<groupId>com.bbossgroups.plugins</groupId>
<artifactId>bboss-elasticsearch-spring-boot-starter</artifactId>
<version>6.0.2</version>
</dependency>
<!-- 操作数据库 通过该包实现数据的全量、增量同步-->
<dependency>
<groupId>com.bbossgroups.plugins</groupId>
<artifactId>bboss-elasticsearch-rest-jdbc</artifactId>
<version>6.0.2</version>
</dependency>
<!-- 数据库连接-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
工程resources目录下,创建
application.properties文件,添加内容:
elasticsearch.rest.hostNames=xxx.xx.xx.x:9200
//如果es服务器没做配置的需要开放9200端口
//es安装目录中的elasticsearch.yml中添加 http.port: 9200
//获取客户端
ClientInterface clientUtil = ElasticSearchHelper.getRestClientUtil();
if (clientUtil.existIndice(ConstUtil.MENU_INDEX)) {//如果存在,删除,线上环境不要删表
clientUtil.dropIndice(ConstUtil.MENU_INDEX);
}
String mapping = readMappingJson("mappings/menu.json");//读取索引配置文件
clientUtil.createIndiceMapping(ConstUtil.MENU_INDEX, mapping);//创建索引
DB2ESImportBuilder importBuilder = DB2ESImportBuilder.newInstance();
//指定导入数据的sql语句,必填项,定时全量导入不需要在sql中设置增量字段,可以设置自己的提取逻辑
importBuilder.setSql("select * from test")
.setDbDriver(ConstUtil.DB_DRIVER)
.setDbUrl("jdbc:mysql://xxx.xx.xx.x.69:3306/ccms?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useCursorFetch=true")
.setDbUser(ConstUtil.DB_USERNAME)
.setDbPassword(ConstUtil.DB_PASSWORD)
.setValidateSQL("select 1")
.setUsePool(false);//是否使用连接池
/**
* es相关配置
*/
importBuilder
.setIndex(index) //必填项 索引名称
.setIndexType(type) //必填项 类型名称
//.setRefreshOption("refresh")//可选项,null表示不实时刷新,importBuilder.setRefreshOption("refresh");表示实时刷新
.setUseJavaName(true) //可选项,将数据库字段名称转换为java驼峰规范的名称,例如:doc_id -> docId
.setBatchSize(5000) //可选项,批量导入es的记录数,默认为-1,逐条处理,> 0时批量处理
.setJdbcFetchSize(10000);//设置数据库的查询fetchsize,同时在mysql url上设置useCursorFetch=true启用mysql的游标fetch机制,否则会有严重的性能隐患,jdbcFetchSize必须和useCursorFetch参数配合使用,否则不会生效
importBuilder.setFixedRate(false)//参考jdk timer task文档对fixedRate的说明
// .setScheduleDate(date) //指定任务开始执行时间:日期
.setDeyLay(5000L); // 任务延迟执行deylay毫秒后执行
// .setPeriod(10000L); //每隔period毫秒执行,如果不设置,只执行一次
/**
* 一次、作业创建一个内置的线程池,实现多线程并行数据导入elasticsearch功能,作业完毕后关闭线程池
*/
importBuilder.setParallel(true);//设置为多线程并行批量导入
importBuilder.setQueue(10);//设置批量导入线程池等待队列长度
importBuilder.setThreadCount(50);//设置批量导入线程池工作线程数量
importBuilder.setContinueOnError(true);//任务出现异常,是否继续执行作业:true(默认值)继续执行 false 中断作业执行
importBuilder.setAsyn(false);//true 异步方式执行,不等待所有导入作业任务结束,方法快速返回;false(默认值) 同步方式执行,等待所有导入作业任务结束,所有作业结束后方法才返回
importBuilder.setEsIdField("_id");//设置文档主键,不设置,则自动产生文档id
importBuilder.setDebugResponse(true);//设置是否将每次处理的reponse打印到日志文件中,默认false,不打印响应报文将大大提升性能,只有在需要的时候才,log日志级别同时要设置为INFO
importBuilder.setDiscardBulkResponse(false);//设置是否需要批量处理的响应报文,不需要设置为false,true为需要,默认true,如果不需要响应报文将大大提升处理速度
DataStream dataStream = importBuilder.builder();
dataStream.execute();
//获取客户端
ClientInterface clientUtil = ElasticSearchHelper.getRestClientUtil();
if (clientUtil.existIndice(ConstUtil.MENU_INDEX)) {//如果存在,删除,线上环境不要删表
clientUtil.dropIndice(ConstUtil.MENU_INDEX);
}
String mapping = readMappingJson("mappings/menu.json");//读取索引配置文件
clientUtil.createIndiceMapping(ConstUtil.MENU_INDEX, mapping);//创建索引
DB2ESImportBuilder importBuilder = DB2ESImportBuilder.newInstance();
//指定导入数据的sql语句,必填项,定时全量导入不需要在sql中设置增量字段,可以设置自己的提取逻辑
importBuilder.setSql("select * from test where last_update_time > #[time]")//设置增量变量 time
.setDbDriver(ConstUtil.DB_DRIVER)
.setDbUrl("jdbc:mysql://xxx.xx.xx.x.69:3306/ccms?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useCursorFetch=true")
.setDbUser(ConstUtil.DB_USERNAME)
.setDbPassword(ConstUtil.DB_PASSWORD)
.setValidateSQL("select 1")
.setUsePool(true);//是否使用连接池
/**
* es相关配置
*/
importBuilder
.setIndex(index) //必填项 索引名称
.setIndexType(type) //必填项 类型名称
//.setRefreshOption("refresh")//可选项,null表示不实时刷新,importBuilder.setRefreshOption("refresh");表示实时刷新
.setUseJavaName(true) //可选项,将数据库字段名称转换为java驼峰规范的名称,例如:doc_id -> docId
.setBatchSize(5000) //可选项,批量导入es的记录数,默认为-1,逐条处理,> 0时批量处理
.setJdbcFetchSize(10000);//设置数据库的查询fetchsize,同时在mysql url上设置useCursorFetch=true启用mysql的游标fetch机制,否则会有严重的性能隐患,jdbcFetchSize必须和useCursorFetch参数配合使用,否则不会生效
importBuilder.setFixedRate(false)//参考jdk timer task文档对fixedRate的说明
// .setScheduleDate(date) //指定任务开始执行时间:日期
.setDeyLay(5000L); // 任务延迟执行deylay毫秒后执行
.setPeriod(10000L); //每隔period毫秒执行,如果不设置,只执行一次
// importBuilder.setNumberLastValueColumn("log_id");//手动指定数字增量查询字段,默认采用上面设置的sql语句中的增量变量名称作为增量查询字段的名称,指定以后就用指定的字段
// importBuilder.setNumberLastValueColumn("log_id");//手动指定日期增量查询字段,默认采用上面设置的sql语句中的增量变量名称作为增量查询字段的名称,指定以后就用指定的字段
importBuilder.setFromFirst(true);//任务重启时,重新开始采集数据,适合于每次全量导入数据的情况,如果是全量导入,可以先删除原来的索引数据
importBuilder.setLastValueStorePath("testdb");//记录上次采集的增量字段值的文件路径,作为下次增量(或者重启后)采集数据的起点
// importBuilder.setLastValueStoreTableName("test");//记录上次采集的增量字段值的表,可以不指定,采用默认表名increament_tab
importBuilder.setLastValueType(ImportIncreamentConfig.NUMBER_TYPE);//如果没有指定增量查询字段名称,则需要指定字段类型:ImportIncreamentConfig.NUMBER_TYPE 数字类型
// 或者ImportIncreamentConfig.TIMESTAMP_TYPE 日期类型
/**
* 一次、作业创建一个内置的线程池,实现多线程并行数据导入elasticsearch功能,作业完毕后关闭线程池
*/
importBuilder.setParallel(true);//设置为多线程并行批量导入
importBuilder.setQueue(10);//设置批量导入线程池等待队列长度
importBuilder.setThreadCount(50);//设置批量导入线程池工作线程数量
importBuilder.setContinueOnError(true);//任务出现异常,是否继续执行作业:true(默认值)继续执行 false 中断作业执行
importBuilder.setAsyn(false);//true 异步方式执行,不等待所有导入作业任务结束,方法快速返回;false(默认值) 同步方式执行,等待所有导入作业任务结束,所有作业结束后方法才返回
importBuilder.setEsIdField("_id");//设置文档主键,不设置,则自动产生文档id
importBuilder.setDebugResponse(true);//设置是否将每次处理的reponse打印到日志文件中,默认false,不打印响应报文将大大提升性能,只有在需要的时候才,log日志级别同时要设置为INFO
importBuilder.setDiscardBulkResponse(false);//设置是否需要批量处理的响应报文,不需要设置为false,true为需要,默认true,如果不需要响应报文将大大提升处理速度
DataStream dataStream = importBuilder.builder();
dataStream.execute();
配置与一步导入一样,只需要注意一下几点
1.创建管道
2.重新设置es数据结构:
importBuilder.setDataRefactor(new DataRefactor() {
@Override
public void refactor(Context context) throws Exception {
String url = (String) context.getValue("url");
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
LocalDateTime parse = LocalDateTime.parse(context.getValue("time").toString(),dateTimeFormatter);
Date date = Date.from(parse.toInstant(ZoneOffset.of("+8")));
String base64Txt = DocUtil.extractTextFromFile(url );
Doc doc = new Doc((String) context.getValue("title"),(String) context.getValue("name"),base64Txt,date);
ClientInterface clientUtil = ElasticSearchHelper.getRestClientUtil();
clientUtil.addDocument(ConstUtil.DOC_INDEX,"doc?pipeline=attachment",doc);//索引数据对象
}
});
QueryBuilder matchAllQuery = matchAllQuery();//设置查询所有,相当于不设置查询条件
NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder();
//设置QueryBuilder
nativeSearchQueryBuilder.withQuery(matchAllQuery);
//设置搜索类型,默认值就是QUERY_THEN_FETCH,参考https://blog.csdn.net/wulex/article/details/71081042
nativeSearchQueryBuilder.withSearchType(SearchType.QUERY_THEN_FETCH);
//指定索引库和文档类型
nativeSearchQueryBuilder.withIndices("test_index");
TermsAggregationBuilder field = AggregationBuilders.terms("wordCount").field("keyword.keyword").size(2);
nativeSearchQueryBuilder.addAggregation(field);
//构建查询对象
NativeSearchQuery nativeSearchQuery = nativeSearchQueryBuilder.build();
Aggregations aggregations = elasticsearchTemplate.query(nativeSearchQuery, response -> response.getAggregations());
Map<String, Aggregation> results = aggregations.asMap();
ParsedStringTerms stringTerms = (ParsedStringTerms) results.get("wordCount");
List<EsSearchLog> list= stringTerms.getBuckets().stream().map(item -> {
return new EsSearchLog(item.getKeyAsString(), item.getDocCount());
}).sorted(Comparator.comparing(EsSearchLog::getCount).reversed()).collect(Collectors.toList());
System.out.println(list);