当前位置: 首页 > 工具软件 > bboss > 使用案例 >

springboo集成bboss-elasticsearch实现elasticsearch客户端

关正雅
2023-12-01

内容简介

首先说明本文要实现的内容:

1.比较bboss-elasticsearch和springboot自带的spring-boot-starter-data-elasticsearch优缺点
2.如果实现数据库数据全量、增量的导入到elasticsearch服务器中
3.将excel、word、ppt、pdf等文件解析到elasticsearch服务器中
4.通过json文件生生成mapping
5.通过json文件实现内容检索
6.通过json文件实现文档高亮查询

1. bboss-elasticsearch 和 spring-boot-starter-data-elasticsearch的优缺点

首先说一下spring-boot-starter-data-elasticsearch。springboot自带,无缝兼容,操作简单,可以像JSP操作数据库一样去操作elasticsearch服务器,但这只是针对简单查询。在实际应用中,并没有太多的简单查询(如果都是简单查询,也没必要使用es吧)。如果是复杂查询,需要大量的代码逻辑,理解难度颇高。

bboss-elasticsearch是国产开源框架,是一套基于query dsl语法操作和访问分布式搜索引擎elasticsearch的o/r mapping高性能开发库,底层基于es restful api,让你像使用mybatis一样操作数据es服务器

个人认为,bboss-elasticsearchs有两点好处,。
1.提供操作数据库的接口,不需要额外配置数据源就可以实现数据的全量、增量导入
2.使用原生的query、dsl语句,友好度更高,方便学习和交流。代码可读性高。

2. 集成 bboss-elasticsearch

由于本项目使用的是6.2.4版本,所以使用6.0.2版本的jar包,其他es版本参考
Elasticsearch Bboss开发文档

<!-- springboot集成es客户端的jar包-->
<dependency>
	<groupId>com.bbossgroups.plugins</groupId>
	<artifactId>bboss-elasticsearch-spring-boot-starter</artifactId>
    <version>6.0.2</version>
</dependency>
<!-- 操作数据库 通过该包实现数据的全量、增量同步-->
<dependency>
	<groupId>com.bbossgroups.plugins</groupId>
	<artifactId>bboss-elasticsearch-rest-jdbc</artifactId>
	<version>6.0.2</version>
</dependency>
<!-- 数据库连接-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>

2.1 相关配置

工程resources目录下,创建
application.properties文件,添加内容:
elasticsearch.rest.hostNames=xxx.xx.xx.x:9200
//如果es服务器没做配置的需要开放9200端口
//es安装目录中的elasticsearch.yml中添加 http.port: 9200

2.2 异步批量同步

  • 适用于项目启动,创建索引导入相关数据
//获取客户端
ClientInterface clientUtil = ElasticSearchHelper.getRestClientUtil();
if (clientUtil.existIndice(ConstUtil.MENU_INDEX)) {//如果存在,删除,线上环境不要删表
	clientUtil.dropIndice(ConstUtil.MENU_INDEX);
 } 
String mapping = readMappingJson("mappings/menu.json");//读取索引配置文件
clientUtil.createIndiceMapping(ConstUtil.MENU_INDEX, mapping);//创建索引

DB2ESImportBuilder importBuilder = DB2ESImportBuilder.newInstance();
//指定导入数据的sql语句,必填项,定时全量导入不需要在sql中设置增量字段,可以设置自己的提取逻辑
importBuilder.setSql("select * from test")
	.setDbDriver(ConstUtil.DB_DRIVER)  
	.setDbUrl("jdbc:mysql://xxx.xx.xx.x.69:3306/ccms?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useCursorFetch=true")
	.setDbUser(ConstUtil.DB_USERNAME)
	.setDbPassword(ConstUtil.DB_PASSWORD)
	.setValidateSQL("select 1")
	.setUsePool(false);//是否使用连接池
	
/**
 * es相关配置
 */
importBuilder
		.setIndex(index) //必填项 索引名称
		.setIndexType(type) //必填项 类型名称
//.setRefreshOption("refresh")//可选项,null表示不实时刷新,importBuilder.setRefreshOption("refresh");表示实时刷新
.setUseJavaName(true) //可选项,将数据库字段名称转换为java驼峰规范的名称,例如:doc_id -> docId
.setBatchSize(5000)  //可选项,批量导入es的记录数,默认为-1,逐条处理,> 0时批量处理
.setJdbcFetchSize(10000);//设置数据库的查询fetchsize,同时在mysql url上设置useCursorFetch=true启用mysql的游标fetch机制,否则会有严重的性能隐患,jdbcFetchSize必须和useCursorFetch参数配合使用,否则不会生效  
importBuilder.setFixedRate(false)//参考jdk timer task文档对fixedRate的说明
//	 .setScheduleDate(date) //指定任务开始执行时间:日期
 .setDeyLay(5000L); // 任务延迟执行deylay毫秒后执行
// .setPeriod(10000L); //每隔period毫秒执行,如果不设置,只执行一次
		 
		 
/**
 * 一次、作业创建一个内置的线程池,实现多线程并行数据导入elasticsearch功能,作业完毕后关闭线程池
 */
importBuilder.setParallel(true);//设置为多线程并行批量导入
importBuilder.setQueue(10);//设置批量导入线程池等待队列长度
importBuilder.setThreadCount(50);//设置批量导入线程池工作线程数量
importBuilder.setContinueOnError(true);//任务出现异常,是否继续执行作业:true(默认值)继续执行 false 中断作业执行
importBuilder.setAsyn(false);//true 异步方式执行,不等待所有导入作业任务结束,方法快速返回;false(默认值) 同步方式执行,等待所有导入作业任务结束,所有作业结束后方法才返回
importBuilder.setEsIdField("_id");//设置文档主键,不设置,则自动产生文档id
importBuilder.setDebugResponse(true);//设置是否将每次处理的reponse打印到日志文件中,默认false,不打印响应报文将大大提升性能,只有在需要的时候才,log日志级别同时要设置为INFO
importBuilder.setDiscardBulkResponse(false);//设置是否需要批量处理的响应报文,不需要设置为false,true为需要,默认true,如果不需要响应报文将大大提升处理速度

DataStream dataStream = importBuilder.builder();
dataStream.execute(); 				

2.3 定时增量同步

//获取客户端
ClientInterface clientUtil = ElasticSearchHelper.getRestClientUtil();
if (clientUtil.existIndice(ConstUtil.MENU_INDEX)) {//如果存在,删除,线上环境不要删表
	clientUtil.dropIndice(ConstUtil.MENU_INDEX);
 } 
String mapping = readMappingJson("mappings/menu.json");//读取索引配置文件
clientUtil.createIndiceMapping(ConstUtil.MENU_INDEX, mapping);//创建索引

DB2ESImportBuilder importBuilder = DB2ESImportBuilder.newInstance();
//指定导入数据的sql语句,必填项,定时全量导入不需要在sql中设置增量字段,可以设置自己的提取逻辑
importBuilder.setSql("select * from test where last_update_time  > #[time]")//设置增量变量 time
	.setDbDriver(ConstUtil.DB_DRIVER)  
	.setDbUrl("jdbc:mysql://xxx.xx.xx.x.69:3306/ccms?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useCursorFetch=true")
	.setDbUser(ConstUtil.DB_USERNAME)
	.setDbPassword(ConstUtil.DB_PASSWORD)
	.setValidateSQL("select 1")
	.setUsePool(true);//是否使用连接池
	
/**
 * es相关配置
 */
importBuilder
		.setIndex(index) //必填项 索引名称
		.setIndexType(type) //必填项 类型名称
//.setRefreshOption("refresh")//可选项,null表示不实时刷新,importBuilder.setRefreshOption("refresh");表示实时刷新
.setUseJavaName(true) //可选项,将数据库字段名称转换为java驼峰规范的名称,例如:doc_id -> docId
.setBatchSize(5000)  //可选项,批量导入es的记录数,默认为-1,逐条处理,> 0时批量处理
.setJdbcFetchSize(10000);//设置数据库的查询fetchsize,同时在mysql url上设置useCursorFetch=true启用mysql的游标fetch机制,否则会有严重的性能隐患,jdbcFetchSize必须和useCursorFetch参数配合使用,否则不会生效  
importBuilder.setFixedRate(false)//参考jdk timer task文档对fixedRate的说明
//	 .setScheduleDate(date) //指定任务开始执行时间:日期
 .setDeyLay(5000L); // 任务延迟执行deylay毫秒后执行
 .setPeriod(10000L); //每隔period毫秒执行,如果不设置,只执行一次
//	importBuilder.setNumberLastValueColumn("log_id");//手动指定数字增量查询字段,默认采用上面设置的sql语句中的增量变量名称作为增量查询字段的名称,指定以后就用指定的字段
//	importBuilder.setNumberLastValueColumn("log_id");//手动指定日期增量查询字段,默认采用上面设置的sql语句中的增量变量名称作为增量查询字段的名称,指定以后就用指定的字段
importBuilder.setFromFirst(true);//任务重启时,重新开始采集数据,适合于每次全量导入数据的情况,如果是全量导入,可以先删除原来的索引数据
importBuilder.setLastValueStorePath("testdb");//记录上次采集的增量字段值的文件路径,作为下次增量(或者重启后)采集数据的起点
//	importBuilder.setLastValueStoreTableName("test");//记录上次采集的增量字段值的表,可以不指定,采用默认表名increament_tab
importBuilder.setLastValueType(ImportIncreamentConfig.NUMBER_TYPE);//如果没有指定增量查询字段名称,则需要指定字段类型:ImportIncreamentConfig.NUMBER_TYPE 数字类型
// 或者ImportIncreamentConfig.TIMESTAMP_TYPE 日期类型		 		 

/**
 * 一次、作业创建一个内置的线程池,实现多线程并行数据导入elasticsearch功能,作业完毕后关闭线程池
 */
importBuilder.setParallel(true);//设置为多线程并行批量导入
importBuilder.setQueue(10);//设置批量导入线程池等待队列长度
importBuilder.setThreadCount(50);//设置批量导入线程池工作线程数量
importBuilder.setContinueOnError(true);//任务出现异常,是否继续执行作业:true(默认值)继续执行 false 中断作业执行
importBuilder.setAsyn(false);//true 异步方式执行,不等待所有导入作业任务结束,方法快速返回;false(默认值) 同步方式执行,等待所有导入作业任务结束,所有作业结束后方法才返回
importBuilder.setEsIdField("_id");//设置文档主键,不设置,则自动产生文档id
importBuilder.setDebugResponse(true);//设置是否将每次处理的reponse打印到日志文件中,默认false,不打印响应报文将大大提升性能,只有在需要的时候才,log日志级别同时要设置为INFO
importBuilder.setDiscardBulkResponse(false);//设置是否需要批量处理的响应报文,不需要设置为false,true为需要,默认true,如果不需要响应报文将大大提升处理速度

DataStream dataStream = importBuilder.builder();
dataStream.execute(); 				

2.4同步office文档内容到es

配置与一步导入一样,只需要注意一下几点
1.创建管道
2.重新设置es数据结构:
importBuilder.setDataRefactor(new DataRefactor() {
	@Override
	public void refactor(Context context) throws Exception {
		String url = (String) context.getValue("url");

		DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
		LocalDateTime parse = LocalDateTime.parse(context.getValue("time").toString(),dateTimeFormatter); 
		Date date = Date.from(parse.toInstant(ZoneOffset.of("+8")));
		
		String base64Txt = DocUtil.extractTextFromFile(url );
		Doc doc = new Doc((String) context.getValue("title"),(String) context.getValue("name"),base64Txt,date);
	
		ClientInterface clientUtil = ElasticSearchHelper.getRestClientUtil();    

		clientUtil.addDocument(ConstUtil.DOC_INDEX,"doc?pipeline=attachment",doc);//索引数据对象
	}
});

聚合查询统计热词数量

 QueryBuilder matchAllQuery = matchAllQuery();//设置查询所有,相当于不设置查询条件

    NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder();
    //设置QueryBuilder
    nativeSearchQueryBuilder.withQuery(matchAllQuery);
    //设置搜索类型,默认值就是QUERY_THEN_FETCH,参考https://blog.csdn.net/wulex/article/details/71081042
    nativeSearchQueryBuilder.withSearchType(SearchType.QUERY_THEN_FETCH);
    //指定索引库和文档类型
    nativeSearchQueryBuilder.withIndices("test_index");
    TermsAggregationBuilder field = AggregationBuilders.terms("wordCount").field("keyword.keyword").size(2);

    nativeSearchQueryBuilder.addAggregation(field);
    //构建查询对象
    NativeSearchQuery nativeSearchQuery = nativeSearchQueryBuilder.build();

    Aggregations aggregations = elasticsearchTemplate.query(nativeSearchQuery, response -> response.getAggregations());
    Map<String, Aggregation> results = aggregations.asMap();
    ParsedStringTerms stringTerms = (ParsedStringTerms) results.get("wordCount");


    List<EsSearchLog> list= stringTerms.getBuckets().stream().map(item -> {
      return new EsSearchLog(item.getKeyAsString(), item.getDocCount());
    }).sorted(Comparator.comparing(EsSearchLog::getCount).reversed()).collect(Collectors.toList());

    System.out.println(list);
 类似资料: