该类采用固定长度的方式,在类加载的时候就初始化了设定的连接数,这里通过BlockingQueue阻塞队列的生产-消费方式来保证多线程环境下的安全。其中Factory和Queue都是单利模式,get和remove都是包级别的访问权限。
/**
* Created by yzz on 2018/9/15.
*/
public class EsClientPool {
//阻塞队列
private static BlockingQueue<JestClient> queue;
//用户自定义配置的Bean
private static EsConfigVO esConfigVO;
//Jest工厂,这里是单利工厂
private static JestClientFactory factory;
//保证factory queue 的单利
static {
esConfigVO = (EsConfigVO) ApplicationHelper.getBeanByClass(EsConfigVO.class);
queue = new ArrayBlockingQueue<>(esConfigVO.getMaxConnection());
factory = new JestClientFactory();
init();
}
//初始化工厂
private static void init(){
try {
//初始化工厂
Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ss").create();
HttpClientConfig clientConfig = new HttpClientConfig
.Builder(esConfigVO.getUrl())
.gson(gson)
.build();
factory.setHttpClientConfig(clientConfig);
//初始化阻塞队列
//初始化连接
initQueue();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 初始化连接
* @throws InterruptedException
*/
static void initQueue() throws InterruptedException {
int i = 0;
int maxConnection = esConfigVO.getMaxConnection();
while(i<maxConnection){
JestClient client = factory.getObject();
queue.put(client);
i++;
}
System.err.println("es 初始化完成");
}
static JestClient get() throws InterruptedException {
return queue.poll(1, TimeUnit.SECONDS);
}
static void remove(JestClient jestClient) throws InterruptedException {
queue.put(jestClient);
}
}
通过ThreadLocal来保证线程局部变量,实现多线程下的资源安全。该holder类和EsClientPool 在同一个包下。
/**
* Created by yzz on 2018/9/15.
*/
public class EsClientHolder {
private static ThreadLocal<JestClient> threadLocal = new ThreadLocal<>();
static void set(){
try {
JestClient jestClient = EsClientPool.get();
threadLocal.set(jestClient);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static JestClient get() throws Exception {
JestClient client = threadLocal.get();
System.out.println("当前线程:"+Thread.currentThread().getName()+" 当前ES:"+client);
if (null == client) {
throw new Exception("please try again !");
}
return client;
}
static void remove(){
try {
JestClient client = threadLocal.get();
EsClientPool.remove(client);
System.out.println("当前线程:"+Thread.currentThread().getName()+" 回收ES:"+client);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
通过对特定注解的监控,来做到在es搜索执行前后对connection做线程绑定和回收
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface EsJestClientAnn {
}
@Component
@Aspect
@Order(-100)
public class EsClientAop {
@Pointcut(value = "execution(public * com.yzz.boot.es.service ..*.*(..))")
public void defaultJestClient(){}
@Before(value = "defaultJestClient()&&@annotation(com.yzz.boot.es.util.EsJestClientAnn)")
public void initClient(){
EsClientHolder.set();
}
@After(value = "defaultJestClient()&&@annotation(com.yzz.boot.es.util.EsJestClientAnn)")
public void recycle(){
EsClientHolder.remove();
}
}
该注解运用了springBoot的特性通过配置文件来填充该Bean。
@Component
@ConfigurationProperties(prefix = "es.config.common")
public class EsConfigVO {
private String url;
private int maxConnection = 10;
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public int getMaxConnection() {
return maxConnection;
}
public void setMaxConnection(int maxConnection) {
this.maxConnection = maxConnection;
}
}
es:
config:
common:
url: http://192.168.1.12:9200
maxConnection: 100
@Service(value = "es_EsSearchService")
public class EsSearchService {
@EsJestClientAnn
public Object test() throws Exception {
JestClient client = EsClientHolder.get();
NodesStats nodesStats = new NodesStats.Builder().build();
JestResult result = client.execute(nodesStats);
return result.getJsonString();
}
}
在本次实践中,采用的pool方式是全部加载的方式,通过懒加载的方式会更好,下次会及时记录下来。该pool的设计优点是,在第一次访问的时候才会去初始化,优点懒加载的意思,也会避免es服务未启动导致项目启动失败的问题。