bboss操作和访问elasticsearch提供两种模式,分别对应两个组件:
RestClientUtil:通用组件,提供所有不依赖dsl的功能,也可以直接接收dsl。
ConfigRestClientUtil:加载配置文件中的dsl来实现对es的操作
这两个组件分别通过org.frameworkset.elasticsearch.ElasticSearchHelper中提供的静态工厂方法获取其单实例对象,这些单实例对象是多线程并发安全的,分别说明如下:
public static ClientInterface getConfigRestClientUtil(String configFile)
public static ClientInterface getConfigRestClientUtil(String elasticSearch,String configFile) //elasticsearch参数指定了bboss中多集群配
通过这两个方法获取到的ClientInterface实例是多线程安全的、单实例对象。
public static ClientInterface getRestClientUtil()
public static ClientInterface getRestClientUtil(String elasticSearch) //elasticsearch参数指定了bboss中多集群配
通过这两个方法获取到的ClientInterface实例是多线程安全的、单实例对象。
直接操作dsl使用实例:
public void testDirectDslQuery(){
String queryAll = "{\"query\": {\"match_all\": {}}}";
ClientInterface clientUtil = ElasticSearchHelper.getRestClientUtil();
ESDatas<Demo> esDatas =clientUtil.searchList("demo/_search",//demo为索引表,_search为检索操作action
queryAll,//queryAll变量对应的dsl语句
Demo.class);
//获取结果对象列表
List<Demo> demos = esDatas.getDatas();
//获取总记录数
long totalSize = esDatas.getTotalSize();
System.out.println(totalSize);
}
在resources下创建配置文件estrace/ESTracesqlMapper.xml,配置一个query dsl脚本,名称为queryServiceByCondition,我们将在后面的ClientInterface 组件中通过queryServiceByCondition引用这个脚本,脚本内容定义如下:
<properties>
<property name="queryServiceByCondition">
<![CDATA[
{
"sort": [ ##排序
{
"startTime": {
"order": "desc"
}
}
],
#if($lastStartTime > 0)//search_after分页查询
"search_after": [#[lastStartTime]],
#end
"size": 100, ##每次返回100条记录
"query": {
"bool":{
"filter": [
{"term": { ##精确查找
"applicationName": #[application]
}}
#if($queryStatus.equals("success"))
,
{"term": { ##精确查找
"err": 0
}}
#elseif($queryStatus.equals("error"))
,
{"term": { ##精确查找
"err": 1
}}
#end
,
{"range": { ##指定时间范围
"startTime": {
"gte": #[startTime],
"lt": #[endTime]
}
}}
]
#if($queryCondition && !$queryCondition.equals(""))
,
"must" : {
"multi_match" : { ##分词检索,指定坐在多个field执行检索
"query" : #[queryCondition],
"fields" : [ "agentId", "applicationName" ,"endPoint","params","remoteAddr","rpc","exceptionInfo"]
}
}
#end
}
},
"aggs": {
"applicationsums": {
"terms": {
"field": "applicationName.keyword",##按应用名称进行统计计数
"size":10000
},
"aggs":{
"successsums" : {
"terms" : {
"field" : "err" ##按err标识统计每个应用的成功数和失败数,0标识成功,1标识失败
}
},
"elapsed_ranges" : {
"range" : {
"field" : "elapsed", ##按响应时间分段统计
"keyed" : true,
"ranges" : [
{ "key" : "1秒", "to" : 1000 },
{ "key" : "3秒", "from" : 1000, "to" : 3000 },
{ "key" : "5秒", "from" : 3000, "to" : 5000 },
{ "key" : "5秒以上", "from" : 5000 }
]
}
}
}
}
}
}]]>
</property>
</properties>
查询的java代码:
import org.frameworkset.elasticsearch.ElasticSearchHelper;
import org.frameworkset.elasticsearch.client.ClientInterface;
//加载配置文件,创建es客户端工具包
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("estrace/ESTracesqlMapper.xml");
//构建查询条件对象
TraceExtraCriteria traceExtraCriteria = new TraceExtraCriteria();
traceExtraCriteria.setApplication("testweb88");
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
traceExtraCriteria.setStartTime(dateFormat.parse("2017-09-02 00:00:00").getTime());
traceExtraCriteria.setEndTime(dateFormat.parse("2017-09-13 00:00:00").getTime());
// 检索条件
String queryCondition = (request.getParameter("queryCondition"));
// 设置检索条件
traceExtraCriteria.setQueryCondition(queryCondition);
// 查询状态:all 全部 success 处理成功 fail 处理失败
String queryStatus = request.getParameter("queryStatus");
traceExtraCriteria.setQueryStatus(queryStatus);
//设置分页数据起点,以时间为起点
String lastStartTimeStr = request.getParameter("lastStartTime");
if(lastStartTimeStr != null && !lastStartTimeStr.equals("")) {
Long lastStartTime = Long.parseLong(lastStartTimeStr);
traceExtraCriteria.setLastStartTime(lastStartTime);
}
//执行查询操作,查询可以是简单的检索查询,也可以结合聚合查询
ESDatas<Traces> data //ESDatas为查询结果集对象,封装了返回的当前查询的List<Traces>结果集、符合条件的总记录数totalSize、以及聚合查询的结果
= clientUtil.searchList("trace-*/_search",//查询操作,查询indices trace-*中符合条件的数据
"queryServiceByCondition",//通过名称引用配置文件中的query dsl语句
traceExtraCriteria,//查询条件封装对象
Traces.class);//指定返回的po对象类型,po对象中的属性与indices表中的文档filed名称保持一致
List<Traces> traceList = data.getDatas();//获取查询到的记过集
long totalSize = data.getTotalSize();//获取总记录数
List<Map<String, Object>> applicationsums= data.getAggregationBuckets("applicationsums");//同时可以做聚合查询,获取聚合查询结果集
for (int i = 0; i < applicationsums .size(); i++) {
Map<String, Object> map = applicationsums.get(i);
//获取子聚合查询结果:成功数和失败数
List<Map<String, Object>> appstatic = (List<Map<String, Object>>)ResultUtil.getAggBuckets(map, "successsums");
//获取响应时间分段统计信息
Map<String, Map<String, Object>> appPeriodstatic = (Map<String, Map<String, Object>>)ResultUtil.getAggBuckets(map, "elapsed_ranges");
}
这里封装条件也可以用Map替代。
比如:
Map<String,Object> params = new HashMap<String,Object>();
//设置applicationName1和applicationName2两个变量的值
params.put("applicationName1","blackca\"tdemo2");
params.put("applicationName2","blackcat\"demo3");
ESDatas<Demo> esDatas = //ESDatas包含当前检索的记录集合,最多1000条记录,由dsl中的size属性指定
clientUtil.searchList("demo/_search",//demo为索引表,_search为检索操作action
"searchWithCustomEscape",//esmapper/demo.xml中定义的dsl语句
params,//变量参数
Demo.class);//返回的文档封装对象类型
按照日期分表:
一个完整的批量添加和修改索引文档的案例
//分表的时间
SimpleDateFormat format = new SimpleDateFormat("yyyy.MM.dd");
String date = format.format(new Date());
ClientInterface clientUtil = ElasticSearchHelper.getRestClientUtil();
//数据1
List<Demo> demos = new ArrayList<>();
Demo demo = new Demo();
demo.setDemoId(2l);
demo.setAgentStarttime(new Date());
demo.setApplicationName("blackcatdemo2");
demo.setContentbody("this is content body2");
demos.add(demo);
//数据2
demo = new Demo();
demo.setDemoId(3l);
demo.setAgentStarttime(new Date());
demo.setApplicationName("blackcatdemo3");
demo.setContentbody("this is content body3");
demos.add(demo);
//批量创建文档
String response = clientUtil.addDateDocuments("demo",//索引表
"demo",//索引类型
demos);
System.out.println("addDateDocument-------------------------");
System.out.println(response);
//批量更新文档
demo.setContentbody("updated");
response = clientUtil.updateDocuments("demo-"+date,"demo",demos);
System.out.println("updateDateDocument-------------------------");
System.out.println(response);
//获取索引文档,json格式
response = clientUtil.getDocument("demo-"+date,//索引表
"demo",//索引类型
"2");//文档id
System.out.println("getDocument-------------------------");
System.out.println(response);
//获取索引文档,返回Demo对象类型
demo = clientUtil.getDocument("demo-"+date,//索引表
"demo",//索引类型
"3",//文档id
Demo.class);
不按日期分表:
//一个完整的批量添加和修改索引文档的案例
ClientInterface clientUtil = ElasticSearchHelper.getRestClientUtil();
List<Demo> demos = new ArrayList<>();
Demo demo = new Demo();
demo.setDemoId(2l);
demo.setAgentStarttime(new Date());
demo.setApplicationName("blackcatdemo2");
demo.setContentbody("this is content body2");
demos.add(demo);
demo = new Demo();
demo.setDemoId(3l);
demo.setAgentStarttime(new Date());
demo.setApplicationName("blackcatdemo3");
demo.setContentbody("this is content body3");
demos.add(demo);
//批量创建文档
String response = clientUtil.addDocuments("demo",//索引表
"demo",//索引类型
demos);
System.out.println("addDocuments-------------------------");
System.out.println(response);
//批量更新文档
demo.setContentbody("updated");
response = clientUtil.updateDocuments("demo","demo",demos);
System.out.println("updateDateDocument-------------------------");
System.out.println(response);
//获取索引文档,json格式
response = clientUtil.getDocument("demo",//索引表
"demo",//索引类型
"2");//文档id
System.out.println("getDocument-------------------------");
System.out.println(response);
//获取索引文档,返回Demo对象类型
demo = clientUtil.getDocument("demo",//索引表
"demo",//索引类型
"3",//文档id
Demo.class);
除了es能够自动生成文档_id属性,bboss提供了三种指定文档_id和parentid的方法:
添加索引文档时,es会自动设置文档_id属性,如果需要人工指定_id值,只需要在对象属性上设置注解@ESId即可,例如:
@ESId //ip属性作为文档唯一标识,根据ip值对应的索引文档存在与否来决定添加或者修改操作
private String ip;
@ESId同样适用于文档批量创建和修改操作
另外一个注解@ESParentId用来表示父子关系,在父子关系检索案例中有介绍。
ESId和ESParentId两个注解在添加/修改文档、批量添加/修改文档操中指定文档的_id和parent属性,如果不指定,es自动生成_id属性,parent必须手工指定。
通过ClientInterface 接口提供的以下通用executeHttp api,我们可以非常方便地实现es中所有带请求报文的功能
ClientInterface clientUtil = ElasticSearchHelper.getRestClientUtil();
//验证环境,获取es状态
String response = clientUtil.executeHttp("/kibana_sample_data_logs/_search", ClientInterface.HTTP_GET);
System.out.println(response);
public void testTempate() throws ParseException{
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/estrace/ESTemplate.xml");
//创建模板
String response = clientUtil.createTempate("demotemplate_1",//模板名称
"demoTemplate");//模板对应的脚本名称,在esmapper/estrace/ESTemplate.xml中配置
System.out.println("createTempate-------------------------");
System.out.println(response);
//获取模板
/**
* 指定模板
* /_template/demoTemplate_1
* /_template/demoTemplate*
* 所有模板 /_template
*
*/
String template = clientUtil.executeHttp("/_template/demotemplate_1",ClientUtil.HTTP_GET);
System.out.println("HTTP_GET-------------------------");
System.out.println(template);
//删除模板
template = clientUtil.executeHttp("/_template/demotemplate_1",ClientUtil.HTTP_DELETE);
System.out.println("HTTP_DELETE-------------------------");
System.out.println(template);
template = clientUtil.executeHttp("/_template/demotemplate_1",ClientUtil.HTTP_GET);
System.out.println("HTTP_GET after delete-------------------------");
System.out.println(template);
}
bboss elasticsearch采用xml文件管理elasticsearch的dsl脚本,在dsl脚本中可以使用变量、foreach循环、逻辑判断、注释;配置文件支持在线修改、自动热加载,开发和调试非常方便。
脚本中变量定义语法有两种:#[xxx]
,$xxx
,尽可能地在脚本中使用#[xxx]
方式的变量,在#[]类型变量中还可以指定属性,后面举例说明。对于#[xxx]
类型变量值中包含的可能破坏dsl json语法结构的特殊字符(例如回车换行符等),框架会自动进行转义处理;$xxx
类型变量直接输出原始值(特殊字符不做转移处理),$xxx
类型变量可以用于if/else
和foreach
循环控制变量,而#[xxx]
不可以。
判断List集合datas不为空并且datas的size大于0
#if($datas && $datas.size()> 0)
#foreach($bb in $datas)
#end
#end
#foreach
-#end
foreach循环内置循环变量:$velocaitycount
,不需要从外部传入
#if
-#else
-#end
,#if
-#elseif
-#else
-#end
变量值逻辑判断
#if($xxxx)
##变量值不为null判断(类似java语法 if(xxxx != null))
#end
#if(!$xxxx)
##变量值为null判断(类似java语法 if(xxxx == null))
#end
#if($xxxx && !$xxxx.equals(""))
##变量值不为null判断且不等于"“判断(类似java语法 if(xxxx != null && !xxx.equals(”")))
#end
#if($xxxx > 0)
##变量值大于某个值判断,其他类似(类似java语法 if(xxxx > 0))
#end
判断List集合不为null并且size大于0
#if($datas && $datas.size() > 0)
#end
逻辑判断还可以包含各种组合 &&
||
操作。
定义变量
#set($needComma = true)
修改$变量值
#set($needComma = false)
dsl注释是用多个#号来标识的,大段注释用#*
和*#
包起来
单行注释:##注释内容
#[application]
变量格式#[aaa]所有格式:
#[aaa] 简单的变量属性引用
#[aaa->bb] 如果aaa是一个bean对象,这个变量格式表示了对aaa对象的bb属性的引用,如果aaa是一个map对象,这个变量格式表示了对aaa对象的key为bb的元素值引用
#[aaa[key]] 引用map对象aaa中key所对应的value数据,引用map元素的等价方法#[aaa->key]
application.properties如下:
elasticsearch.rest.hostNames=localhost:9200
#动态索引表名称日期格式配置
elasticsearch.dateFormat=yyyy.MM.dd
elasticsearch.timeZone=Asia/Shanghai
elasticsearch.ttl=2d
#在控制台输出脚本调试开关showTemplate,false关闭,true打开,同时log4j至少是info级别
elasticsearch.showTemplate=false
#客户端动态发现es集群节点控制开关
elasticsearch.discoverHost=false
#http链接池配置
http.timeoutConnection = 400000
http.timeoutSocket = 400000
http.connectionRequestTimeout=400000
http.retryTime = 1
http.maxLineLength = -1
http.maxHeaderCount = 200
http.maxTotal = 400
http.defaultMaxPerRoute = 200
# dsl配置文件热加载扫描时间间隔,毫秒为单位,默认5秒扫描一次,<= 0时关闭扫描机制
dslfile.refreshInterval = -1
依赖如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.3</version>
</dependency>
<!-- bboss客户端开始 -->
<dependency>
<groupId>com.bbossgroups.plugins</groupId>
<artifactId>bboss-elasticsearch-spring-boot-starter</artifactId>
<version>5.3.7</version>
</dependency>
<dependency>
<groupId>com.bbossgroups.plugins</groupId>
<artifactId>bboss-elasticsearch-rest-jdbc</artifactId>
<version>5.3.7</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>jdbc</artifactId>
<version>6.3.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.xerial</groupId>
<artifactId>sqlite-jdbc</artifactId>
<version>3.23.1</version>
<scope>test</scope>
</dependency>
<!-- bboss客户端结束 -->
Demo地址如下:
ESDemo地址