公司使用分布式项目,每天日志量太大,需要用到轻量级的分布式日志系统 PlumeLog。
但是发现该框架有个问题:
ES查询结果有最大10000条的限制。(一天生成一个key,几个单体服务的日志都放在ES的同一个key中,所以一天的日志量远远超过了10000条)
思路:
ClientConfig.java中的修改
package com.plumelog.server.client;
import com.plumelog.core.client.AbstractClient;
import com.plumelog.core.client.AbstractServerClient;
import com.plumelog.core.constant.LogMessageConstant;
import com.plumelog.core.kafka.KafkaConsumerClient;
import com.plumelog.core.lucene.LuceneClient;
import com.plumelog.core.redis.RedisClient;
import com.plumelog.core.redis.RedisClusterClient;
import com.plumelog.core.redis.RedisSentinelClient;
import com.plumelog.server.config.CollectStartBean;
import com.plumelog.server.config.InitConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.StringUtils;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
import java.time.ZoneId;
/**
* className:RedisClientConfig
* description: TODO
* time:2020-07-02.15:51
*
* @author Tank
* @version 1.0.0
*/
@Configuration
@Order(1)
@EnableWebSocket
public class ClientConfig implements InitializingBean {
private static final org.slf4j.Logger logger = LoggerFactory.getLogger(CollectStartBean.class);
@Value("${plumelog.maxSendSize:5000}")
public int maxSendSize = 5000;
@Value("${plumelog.interval:100}")
public int interval = 100;
@Value("${plumelog.kafka.kafkaGroupName:logConsumer}")
public String kafkaGroupName = "logConsumer";
@Value("${plumelog.model:redis}")
private String model;
@Value("${plumelog.kafka.kafkaHosts:}")
private String kafkaHosts;
/**
* 支持携带协议,如:http、https
*/
@Value("${plumelog.es.esHosts:}")
private String esHosts;
/**
* 信任自签证书
* <p>默认:true
*/
@Value("${plumelog.es.trustSelfSigned:true}")
private boolean trustSelfSigned = true;
/**
* hostname验证
* <p>默认:false
*/
@Value("${plumelog.es.hostnameVerification:false}")
private boolean hostnameVerification = false;
@Value("${plumelog.es.indexType:}")
private String indexType;
@Value("${plumelog.es.userName:}")
private String esUserName;
@Value("${plumelog.es.passWord:}")
private String esPassWord;
@Value("${plumelog.es.shards:5}")
private int shards;
@Value("${plumelog.es.replicas:0}")
private int replicas;
@Value("${plumelog.es.maxShards:100000}")
private Long maxShards;
@Value("${plumelog.es.maxResult:2000000}")
private Long maxResult;
@Value("${plumelog.es.refresh.interval:10s}")
private String refreshInterval;
@Value("${plumelog.es.indexType.model:day}")
private String indexTypeModel;
@Value("${plumelog.es.indexType.zoneId:GMT+8}")
private String indexTypeZoneId;
@Value("${plumelog.redis.redisHost:}")
private String redisHost;
@Value("${plumelog.redis.redisPassWord:}")
private String redisPassWord;
@Value("${plumelog.redis.redisDb:0}")
private int redisDb = 0;
@Value("${plumelog.redis.sentinel.masterName:}")
private String redisSentinelMasterName;
@Value("${plumelog.rest.restUrl:}")
private String restUrl;
@Value("${plumelog.rest.restUserName:}")
private String restUserName;
@Value("${plumelog.rest.restPassWord:}")
private String restPassWord;
@Value("${login.username:}")
private String loginUsername;
@Value("${login.password:}")
private String loginPassword;
@Value("${plumelog.queue.redis.redisHost:}")
private String queueRedisHost;
@Value("${plumelog.queue.redis.sentinel.masterName:}")
private String queueRedisSentinelMasterName;
@Value("${plumelog.queue.redis.redisPassWord:}")
private String queueRedisPassWord;
@Value("${plumelog.queue.redis.redisDb:0}")
private int queueRedisDb = 0;
@Value("${admin.log.keepDays:0}")
private int keepDays;
@Value("${admin.log.trace.keepDays:0}")
private int traceKeepDays;
@Value("${plumelog.local.path:}")
private String liteLogPath;
@Value("${plumelog.inside.redis.host:}")
private String insideRedis;
@Bean(name = "redisClient")
public AbstractClient initRedisClient() {
if (InitConfig.LITE_MODE_NAME.equals(this.model)) {
return null;
}
String mgRedisHost = "";
String mgRedisPassWord = "";
String mgMasterName = "";
int mgRedisDb = 0;
if (!StringUtils.isEmpty(this.redisHost)) {
mgRedisHost = this.redisHost;
mgRedisPassWord = this.redisPassWord;
mgRedisDb = this.redisDb;
mgMasterName = this.redisSentinelMasterName;
} else {
mgRedisHost = this.queueRedisHost;
mgRedisPassWord = this.queueRedisPassWord;
mgRedisDb = this.queueRedisDb;
mgMasterName = this.queueRedisSentinelMasterName;
}
if (!StringUtils.isEmpty(insideRedis)&&StringUtils.isEmpty(mgRedisHost)) {
mgRedisHost=this.insideRedis;
}
logger.info("管理 redis host:{}", mgRedisHost);
if (!StringUtils.isEmpty(mgRedisHost)) {
if (!StringUtils.isEmpty(mgMasterName)) {
return new RedisSentinelClient(mgRedisHost, mgMasterName, mgRedisPassWord, mgRedisDb);
}
if (mgRedisHost.split(",").length > 1) {
return new RedisClusterClient(mgRedisHost, mgRedisPassWord);
} else {
String[] hs = mgRedisHost.split(":");
int port = 6379;
String ip = "127.0.0.1";
if (hs.length == 2) {
ip = hs[0];
port = Integer.parseInt(hs[1]);
} else {
logger.error("redis config error! please check the application.properties(plumelog.queue.redis.redisHost) ");
return null;
}
return new RedisClient(ip, port, mgRedisPassWord, mgRedisDb);
}
}
logger.error("找不到redis配置项!请检查配置!");
return null;
}
@Bean(name = "redisQueueClient")
public AbstractClient initRedisQueueClient() {
if (InitConfig.LITE_MODE_NAME.equals(this.model)) {
return null;
}
String mgRedisHost = "";
String mgRedisPassWord = "";
String mgMasterName = "";
int mgRedisDb = 0;
mgRedisHost = this.queueRedisHost;
mgRedisPassWord = this.queueRedisPassWord;
mgRedisDb = this.queueRedisDb;
mgMasterName = this.queueRedisSentinelMasterName;
if (!StringUtils.isEmpty(insideRedis)&&StringUtils.isEmpty(mgRedisHost)) {
mgRedisHost=this.insideRedis;
}
logger.info("队列 redis host:{}", mgRedisHost);
if (!StringUtils.isEmpty(mgRedisHost)) {
if (!StringUtils.isEmpty(mgMasterName)) {
return new RedisSentinelClient(mgRedisHost, mgMasterName, mgRedisPassWord, mgRedisDb);
}
if (mgRedisHost.split(",").length > 1) {
return new RedisClusterClient(mgRedisHost, mgRedisPassWord);
} else {
String[] hs = mgRedisHost.split(":");
int port = 6379;
String ip = "127.0.0.1";
if (hs.length == 2) {
ip = hs[0];
port = Integer.parseInt(hs[1]);
} else {
logger.error("redis config error! please check the application.properties(plumelog.queue.redis.redisHost) ");
return null;
}
return new RedisClient(ip, port, mgRedisPassWord, mgRedisDb);
}
}
logger.error("找不到redis配置项!请检查配置!");
return null;
}
@Bean
public AbstractServerClient initAbstractServerClient() {
if(InitConfig.LITE_MODE_NAME.equals(model)){
logger.info("当前日志将存储在本地!");
return new LuceneClient(InitConfig.LITE_MODE_LOG_PATH);
}
if (StringUtils.isEmpty(esHosts)) {
logger.error("can not find esHosts config ! please check the application.properties(plumelog.es.esHosts) ");
return null;
}
ElasticLowerClient elasticLowerClient = ElasticLowerClient.getInstance(esHosts, esUserName, esPassWord, trustSelfSigned, hostnameVerification);
String esVersion = elasticLowerClient.getVersion();
logger.info("es 初始化成功!Elastic 版本:{}", esVersion);
if (esVersion != null && Integer.parseInt(esVersion.split("\\.")[0]) < 7) {
InitConfig.esVersion=Integer.parseInt(esVersion.split("\\.")[0]);
logger.info("set index type=plumelog");
this.indexType = "plumelog";
LogMessageConstant.ES_TYPE= "plumelog";
}
return elasticLowerClient;
}
@Bean
public KafkaConsumer initKafkaConsumer() {
if (InitConfig.KAFKA_MODE_NAME.equals(model)) {
if (StringUtils.isEmpty(kafkaHosts)) {
logger.error("can not find kafkaHosts config! please check the application.properties(plumelog.kafka.kafkaHosts) ");
return null;
}
return KafkaConsumerClient.getInstance(kafkaHosts, InitConfig.KAFKA_GROUP_NAME, InitConfig.MAX_SEND_SIZE).getKafkaConsumer();
}
return null;
}
/**
* 加载配置
*/
private void loadConfig() {
InitConfig.MAX_SEND_SIZE = this.maxSendSize;
InitConfig.KAFKA_GROUP_NAME = this.kafkaGroupName;
InitConfig.MAX_INTERVAL = this.interval;
InitConfig.START_MODEL = this.model;
InitConfig.ES_INDEX_SHARDS = this.shards;
InitConfig.ES_INDEX_REPLICAS = this.replicas;
InitConfig.ES_REFRESH_INTERVAL = this.refreshInterval;
InitConfig.ES_MAX_RESULT = this.maxResult;
InitConfig.ES_INDEX_MODEL = this.indexTypeModel;
if(this.liteLogPath!=null&&!"".equals(this.liteLogPath)){
InitConfig.LITE_MODE_LOG_PATH=this.liteLogPath;
}else {
InitConfig.LITE_MODE_LOG_PATH=System.getProperty("user.dir");
}
try {
ZoneId.of(this.indexTypeZoneId);
InitConfig.ES_INDEX_ZONE_ID = this.indexTypeZoneId;
} catch (Exception e) {
logger.error("Please check config 'plumelog.es.indexType.zoneId', the value '{}' is invalid, use default value '{}'!",
this.indexTypeZoneId, InitConfig.ES_INDEX_ZONE_ID);
}
InitConfig.restUrl = this.restUrl;
InitConfig.restUserName = this.restUserName;
InitConfig.restPassWord = this.restPassWord;
LogMessageConstant.ES_TYPE = this.indexType;
InitConfig.loginUsername = this.loginUsername;
InitConfig.loginPassword = this.loginPassword;
InitConfig.keepDays = this.keepDays;
InitConfig.traceKeepDays = this.traceKeepDays;
InitConfig.maxShards=this.maxShards;
logger.info("server run model:" + this.model);
logger.info("maxSendSize:" + this.maxSendSize);
logger.info("interval:" + this.interval);
}
@Override
public void afterPropertiesSet() throws Exception {
try {
loadConfig();
logger.info("load config success!");
} catch (Exception e) {
logger.error("plumelog load config success failed!", e);
}
}
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
@Bean
public TaskScheduler taskScheduler(){
ThreadPoolTaskScheduler taskScheduler=new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(10);
taskScheduler.initialize();;
return taskScheduler;
}
}
ElasticLowerClient.java修改
package com.plumelog.server.client;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.plumelog.core.client.AbstractServerClient;
import com.plumelog.core.constant.LogMessageConstant;
import com.plumelog.core.util.GfJsonUtil;
import com.plumelog.core.util.ThreadPoolUtil;
import com.plumelog.server.config.InitConfig;
import com.plumelog.server.client.http.SkipHostnameVerifier;
import com.plumelog.server.client.http.SkipSslVerificationHttpRequestFactory;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.*;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import javax.net.ssl.SSLContext;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
/**
* className:ElasticLowerClient
* description:ElasticLowerClient
*
* @author Frank.chen
* @version 1.0.0
*/
public class ElasticLowerClient extends AbstractServerClient {
private static final String opensearch = "opensearch";
private static final String distribution = "distribution";
private static ElasticLowerClient instance;
private static final ThreadPoolExecutor threadPoolExecutor = ThreadPoolUtil.getPool(5, 5, 100);
private final org.slf4j.Logger logger = LoggerFactory.getLogger(ElasticLowerClient.class);
private RestClient client;
/**
* 带密码认证的
*
* @param hosts
* @param userName
* @param passWord
*/
public ElasticLowerClient(String hosts, String userName, String passWord, boolean trustSelfSigned, boolean hostnameVerification) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, passWord)); //es账号密码
String[] hostsAndPorts = hosts.split(",");
HttpHost[] httpHosts = new HttpHost[hostsAndPorts.length];
for (int i = 0; i < hostsAndPorts.length; i++) {
String hostAndPort = hostsAndPorts[i].trim();
if (!StringUtils.isEmpty(hostAndPort)) {
httpHosts[i] = HttpHost.create(hostAndPort);
}
}
client = RestClient.builder(httpHosts).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.disableAuthCaching();
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
// 设置ssl配置
if (trustSelfSigned && !hostnameVerification) {
httpClientBuilder.setSSLContext(SkipSslVerificationHttpRequestFactory.getSSLContext());
httpClientBuilder.setSSLHostnameVerifier(new SkipHostnameVerifier());
} else if (trustSelfSigned) {
httpClientBuilder.setSSLContext(SkipSslVerificationHttpRequestFactory.getSSLContext());
}
return httpClientBuilder;
}
}).build();
}
/**
* 带ssl认证的
*
* @param hosts
* @param keyStorePass
* @param sslFile
* @param keyStoreName
*/
public ElasticLowerClient(String hosts, String keyStorePass, String sslFile, String keyStoreName) {
try {
Path keyStorePath = Paths.get(sslFile);
KeyStore truststore = KeyStore.getInstance(keyStoreName);
try (InputStream is = Files.newInputStream(keyStorePath)) {
truststore.load(is, keyStorePass.toCharArray());
}
SSLContextBuilder sslBuilder = SSLContexts.custom().loadTrustMaterial(truststore, null);
final SSLContext sslContext = sslBuilder.build();
String[] hostsAndPorts = hosts.split(",");
HttpHost[] httpHosts = new HttpHost[hostsAndPorts.length];
for (int i = 0; i < hostsAndPorts.length; i++) {
String hostAndPort = hostsAndPorts[i].trim();
if (!StringUtils.isEmpty(hostAndPort)) {
httpHosts[i] = HttpHost.create(hostAndPort);
}
}
client = RestClient.builder(httpHosts).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.disableAuthCaching();
return httpClientBuilder.setSSLContext(sslContext);
}
}).build();
} catch (Exception e) {
logger.error("ElasticSearch init fail!", e);
}
}
public static ElasticLowerClient getInstance(String hosts, String userName, String passWord, boolean trustSelfSigned, boolean hostnameVerification) {
if (instance == null) {
synchronized (ElasticLowerClient.class) {
if (instance == null) {
instance = new ElasticLowerClient(hosts, userName, passWord, trustSelfSigned, hostnameVerification);
}
}
}
return instance;
}
@Override
public boolean existIndice(String indice) {
try {
Request request = new Request(
"HEAD",
"/" + indice + "");
Response res = client.performRequest(request);
if (res.getStatusLine().getStatusCode() == 200) {
return true;
}
} catch (Exception e) {
logger.error("", e);
}
return false;
}
@Override
public String getVersion() {
try {
Request request = new Request("GET", "/");
Response res = client.performRequest(request);
if (res.getStatusLine().getStatusCode() == 200) {
String jsonStr = EntityUtils.toString(res.getEntity(), "utf-8");
JSONObject jsonObject = JSON.parseObject(jsonStr);
JSONObject version = jsonObject.getJSONObject("version");
if (opensearch.equals(version.getString(distribution))) {
int lucene_version = Integer.parseInt(version.getString("lucene_version").split("\\.")[0]);
return String.valueOf(lucene_version - 1);
}
return version.getString("number");
} else {
String responseStr = EntityUtils.toString(res.getEntity());
logger.error("ElasticSearch GET Failure! {}", responseStr);
}
} catch (Exception e) {
logger.error("", e);
}
return null;
}
@Override
public boolean creatIndice(String indice, String type) {
try {
Request request = new Request(
"PUT",
"/" + indice + "");
String properties = "\"properties\":{\"appName\":{\"type\":\"keyword\"}," +
"\"env\":{\"type\":\"keyword\"}," +
"\"appNameWithEnv\":{\"type\":\"keyword\"}," +
"\"logLevel\":{\"type\":\"keyword\"}," +
"\"serverName\":{\"type\":\"keyword\"}," +
"\"traceId\":{\"type\":\"keyword\"}," +
"\"dtTime\":{\"type\":\"date\",\"format\":\"strict_date_optional_time||epoch_millis\"}," +
"\"seq\":{\"type\":\"long\"}" +
"}";
String ent = "{\"settings\":{\"number_of_shards\":" + InitConfig.ES_INDEX_SHARDS +
",\"number_of_replicas\":" + InitConfig.ES_INDEX_REPLICAS +
",\"refresh_interval\":\"" + InitConfig.ES_REFRESH_INTERVAL +
"\",\"max_result_window\":" + InitConfig.ES_MAX_RESULT + "}";
if (StringUtils.isEmpty(type)) {
ent = ent + ",\"mappings\":{" + properties + "}}";
} else {
ent = ent + ",\"mappings\":{\"" + type + "\":{" + properties + "}}}";
}
request.setJsonEntity(ent);
Response res = client.performRequest(request);
if (res.getStatusLine().getStatusCode() == 200) {
logger.info("create index {} success", indice);
return true;
} else {
String responseStr = EntityUtils.toString(res.getEntity());
logger.error("ElasticSearch PUT Failure! {}", responseStr);
}
} catch (Exception e) {
logger.error("", e);
}
return false;
}
@Override
public boolean setMapping(String indice, String type) {
try {
String url = "/" + indice + "/_mapping?pretty";
if (!StringUtils.isEmpty(type)) {
url = "/" + indice + "/" + type + "/_mapping?pretty";
}
Request request = new Request(
"PUT", url
);
String properties = "\"properties\":{\"appName\":{\"type\":\"keyword\"}," +
"\"env\":{\"type\":\"keyword\"}," +
"\"appNameWithEnv\":{\"type\":\"keyword\"}," +
"\"logLevel\":{\"type\":\"keyword\"}," +
"\"serverName\":{\"type\":\"keyword\"}," +
"\"traceId\":{\"type\":\"keyword\"}," +
"\"dtTime\":{\"type\":\"date\",\"format\":\"strict_date_optional_time||epoch_millis\"}," +
"\"seq\":{\"type\":\"long\"}" +
"}";
String ent = "{\"" + type + "\":{" + properties + "}}";
request.setJsonEntity(ent);
Response res = client.performRequest(request);
if (res.getStatusLine().getStatusCode() == 200) {
logger.info("reset index {} mapping success", indice);
return true;
} else {
String responseStr = EntityUtils.toString(res.getEntity());
logger.error("ElasticSearch PUT Failure! {}", responseStr);
}
} catch (Exception e) {
logger.error("", e);
}
return false;
}
@Override
public boolean creatIndiceTrace(String indice, String type) {
try {
Request request = new Request(
"PUT",
"/" + indice + "");
String properties = "\"properties\":{\"appName\":{\"type\":\"keyword\"}," +
"\"env\":{\"type\":\"keyword\"}," +
"\"appNameWithEnv\":{\"type\":\"keyword\"}," +
"\"traceId\":{\"type\":\"keyword\"}" +
"}";
String ent = "{\"settings\":{\"number_of_shards\":" + InitConfig.ES_INDEX_SHARDS +
",\"number_of_replicas\":" + InitConfig.ES_INDEX_REPLICAS +
",\"refresh_interval\":\"" + InitConfig.ES_REFRESH_INTERVAL +
"\",\"max_result_window\":" + InitConfig.ES_MAX_RESULT + "}";
if (StringUtils.isEmpty(type)) {
ent = ent + ",\"mappings\":{" + properties + "}}";
} else {
ent = ent + ",\"mappings\":{\"" + type + "\":{" + properties + "}}}";
}
request.setJsonEntity(ent);
Response res = client.performRequest(request);
if (res.getStatusLine().getStatusCode() == 200) {
logger.info("create index {} success", indice);
return true;
} else {
String responseStr = EntityUtils.toString(res.getEntity());
logger.error("ElasticSearch PUT Failure! {}", responseStr);
}
} catch (Exception e) {
logger.error("", e);
}
return false;
}
@Override
public boolean creatIndiceNomal(String indice, String type) {
try {
Request request = new Request(
"PUT",
"/" + indice + "");
String ent = "{\"settings\":{\"number_of_shards\":5,\"number_of_replicas\":0,\"refresh_interval\":\"10s\"," +
"\"max_result_window\":2000000000}}";
request.setJsonEntity(ent);
Response res = client.performRequest(request);
if (res.getStatusLine().getStatusCode() == 200) {
return true;
} else {
String responseStr = EntityUtils.toString(res.getEntity());
logger.error("ElasticSearch PUT Failure! {}", responseStr);
}
} catch (Exception e) {
logger.error("", e);
}
return false;
}
@Override
public boolean addShards(Long shardCount) {
try {
Request request = new Request(
"PUT",
"/_cluster/settings");
String ent = "{\"persistent\":{\"cluster\":{\"max_shards_per_node\":" + shardCount + "}}}";
request.setJsonEntity(ent);
Response res = client.performRequest(request);
if (res.getStatusLine().getStatusCode() == 200) {
return true;
} else {
String responseStr = EntityUtils.toString(res.getEntity());
logger.error("ElasticSearch PUT Failure! {}", responseStr);
}
} catch (Exception e) {
logger.error("", e);
}
return false;
}
@Override
public void insertListLog(List<String> list, String baseIndex, String type) throws IOException {
if (!existIndice(baseIndex)) {
if (baseIndex.startsWith(LogMessageConstant.ES_INDEX)) {
creatIndice(baseIndex, type);
} else {
creatIndiceNomal(baseIndex, type);
}
}
insertListV1(list, baseIndex, type);
}
@Override
public void insertListTrace(List<String> list, String baseIndex, String type) throws IOException {
insertListV1(list, baseIndex, type);
}
@Override
public void insertListComm(List<String> list, String baseIndex, String type) throws IOException {
insertListV1(list, baseIndex, type);
}
private void insertList(List<String> list, String baseIndex, String type) throws IOException {
StringBuffer sendStr = new StringBuffer();
int size = list.size();
for (int a = 0; a < size; a++) {
String map = list.get(a);
String ent = "{\"index\":{} ";
sendStr.append(ent);
sendStr.append("\r\n");
sendStr.append(map);
sendStr.append("\r\n");
}
list = null;
String endpoint = "";
if (StringUtils.isEmpty(type)) {
endpoint = "/" + baseIndex + "/_bulk";
} else {
endpoint = "/" + baseIndex + "/" + type + "/_bulk";
}
Request request = new Request(
"PUT",
endpoint);
request.setJsonEntity(sendStr.toString());
client.performRequestAsync(request, new ResponseListener() {
@Override
public void onSuccess(Response response) {
request.setEntity(null);
try {
if (response.getStatusLine().getStatusCode() == 200) {
logger.info("ElasticSearch commit! success");
} else {
String responseStr = EntityUtils.toString(response.getEntity());
logger.error("ElasticSearch commit Failure! {}", responseStr);
}
} catch (IOException e) {
logger.error("ElasticSearch commit Failure!", e);
}
}
@Override
public void onFailure(Exception e) {
logger.error("ElasticSearch commit Failure!", e);
}
});
}
private void insertListV1(List<String> list, String baseIndex, String type) throws IOException {
StringBuffer sendStr = new StringBuffer();
int size = list.size();
for (int a = 0; a < size; a++) {
String map = list.get(a);
String ent = "{\"index\":{} ";
sendStr.append(ent);
sendStr.append("\r\n");
sendStr.append(map);
sendStr.append("\r\n");
}
list = null;
String endpoint = "";
if (StringUtils.isEmpty(type)) {
endpoint = "/" + baseIndex + "/_bulk";
} else {
endpoint = "/" + baseIndex + "/" + type + "/_bulk";
}
Request request = new Request(
"PUT",
endpoint);
request.setJsonEntity(sendStr.toString());
final Request requestStr = request;
threadPoolExecutor.execute(() -> {
try {
long startTime = System.currentTimeMillis();
Response response = client.performRequest(requestStr);
long endTime = System.currentTimeMillis();
requestStr.setEntity(null);
if (response.getStatusLine().getStatusCode() == 200) {
logger.info("ElasticSearch commit! success,日志提交ES耗时:{}", endTime - startTime);
} else {
String responseStr = EntityUtils.toString(response.getEntity());
logger.error("ElasticSearch commit Failure! {},日志提交ES耗时:{}", responseStr, endTime - startTime);
}
} catch (IOException e) {
e.printStackTrace();
}
});
}
@Override
public String cat(String index) {
String reStr = "";
Request request = new Request(
"GET",
"/_cat/indices/" + index + "?v");
try {
Response res = client.performRequest(request);
InputStream inputStream = res.getEntity().getContent();
byte[] bytes = new byte[0];
bytes = new byte[inputStream.available()];
inputStream.read(bytes);
String str = new String(bytes);
reStr = str;
} catch (Exception e) {
e.printStackTrace();
reStr = "";
}
BufferedReader br = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(reStr.getBytes(StandardCharsets.UTF_8)), StandardCharsets.UTF_8));
List<String> list = new ArrayList<>();
try {
while (true) {
String aa = br.readLine();
if (StringUtils.isEmpty(aa)) {
break;
}
list.add(aa);
}
List<Map<String, String>> listMap = new ArrayList<>();
if (list.size() > 0) {
String[] title = list.get(0).split("\\s+");
for (int i = 1; i < list.size(); i++) {
String[] values = list.get(i).split("\\s+");
Map<String, String> map = new HashMap<>();
for (int j = 0; j < title.length; j++) {
map.put(title[j], values[j]);
}
listMap.add(map);
}
}
return GfJsonUtil.toJSONString(listMap);
} catch (IOException e) {
logger.error("", e);
}
return "";
}
@Override
public String get(String url, String queryStr) throws Exception {
StringEntity stringEntity = new StringEntity(queryStr, "utf-8");
stringEntity.setContentType("application/json");
Request request = new Request(
"GET",
url);
request.setEntity(stringEntity);
Response res = client.performRequest(request);
if (res.getStatusLine().getStatusCode() == 200) {
logger.info("ElasticSearch query success!");
} else {
String responseStr = EntityUtils.toString(res.getEntity());
logger.error("ElasticSearch query Failure! {}", responseStr);
}
return EntityUtils.toString(res.getEntity(), "utf-8");
}
@Override
public String get(String indexStr, String queryStr, String from, String size) throws Exception {
String url = "/" + indexStr + "/_search?track_total_hits=true&from=" + from + "&size=" + size;
StringEntity stringEntity = new StringEntity(queryStr, "utf-8");
stringEntity.setContentType("application/json");
Request request = new Request(
"GET",
url);
request.setEntity(stringEntity);
Response res = client.performRequest(request);
if (res.getStatusLine().getStatusCode() == 200) {
logger.info("ElasticSearch query success!");
} else {
String responseStr = EntityUtils.toString(res.getEntity());
logger.error("ElasticSearch query Failure! {}", responseStr);
}
return EntityUtils.toString(res.getEntity(), "utf-8");
}
@Override
public String group(String indexStr, String queryStr) throws Exception {
String url = "/" + indexStr + "/_search";
StringEntity stringEntity = new StringEntity(queryStr, "utf-8");
stringEntity.setContentType("application/json");
Request request = new Request(
"GET",
url);
request.setEntity(stringEntity);
Response res = client.performRequest(request);
if (res.getStatusLine().getStatusCode() == 200) {
logger.info("ElasticSearch query success!");
} else {
String responseStr = EntityUtils.toString(res.getEntity());
logger.error("ElasticSearch query Failure! {}", responseStr);
}
return EntityUtils.toString(res.getEntity(), "utf-8");
}
@Override
public List<String> getExistIndices(String[] indices) {
List<String> existIndexList = new ArrayList<String>();
for (String index : indices) {
try {
Request request = new Request(
"HEAD",
"/" + index + "");
Response res = client.performRequest(request);
if (res.getStatusLine().getStatusCode() == 200) {
existIndexList.add(index);
}
} catch (Exception e) {
e.printStackTrace();
}
}
return existIndexList;
}
@Override
public boolean deleteIndex(String index) {
try {
Request request = new Request(
"DELETE",
"/" + index + "");
Response res = client.performRequest(request);
if (res.getStatusLine().getStatusCode() == 200) {
return true;
}
} catch (ResponseException e) {
if (e.getResponse().getStatusLine().getStatusCode() != 404) {
e.printStackTrace();
return false;
}
} catch (Exception e) {
e.printStackTrace();
return false;
}
return false;
}
@Override
public void close() {
try {
client.close();
} catch (IOException e) {
logger.error("", e);
}
}
}
InitConfig.java 修改
package com.plumelog.server.config;
public class InitConfig {
public final static String KAFKA_MODE_NAME = "kafka";
public final static String REDIS_MODE_NAME = "redis";
public final static String REST_MODE_NAME = "rest";
public final static String LITE_MODE_NAME = "lite";
public static String LITE_MODE_LOG_PATH = ".";
public final static String REDIS_CLUSTER_MODE_NAME = "redisCluster";
public final static String REDIS_SENTINEL_MODE_NAME = "redisSentinel";
public static final long MILLS_ONE_DAY = 24 * 60 * 60 * 1000;
//最大每次发送日志条数
public static int MAX_SEND_SIZE = 5000;
//日志抓取频次间隔时间
public static int MAX_INTERVAL = 100;
//kafka消费组名称
public static String KAFKA_GROUP_NAME = "logConsumer";
//kafka消费组名称
public static String START_MODEL = "redis";
public static int ES_INDEX_SHARDS = 5;
public static int ES_INDEX_REPLICAS = 0;
public static String ES_REFRESH_INTERVAL = "10s";
public static String ES_INDEX_MODEL = "day";
public static String ES_INDEX_ZONE_ID = "GMT+8";
public static String restUserName = "";
public static String restPassWord = "";
public static String restUrl = "";
public static String loginUsername = "";
public static String loginPassword = "";
public static int keepDays = 0;
public static int traceKeepDays = 0;
public static Long maxShards = 100000L;
public static Long ES_MAX_RESULT = 200000000L;
public static int esVersion = 7;
public final static String WEB_CONSOLE_KEY="plumelog:web_console";
public final static String WEB_CONSOLE_CHANNEL="plumelog_webConsole_Channel";
}
application.properties配置文件中添加:
plumelog.es.maxResult=200000000
前端中app.js的查询修改一下
var express = require('express');
var config = require("./src/config.json")
var app = express();
var bodyParser = require('body-parser');
app.use(bodyParser.urlencoded({extended : false}));
var superagent = require('superagent');
const path = require('path');
app.all('*', function(req, res, next) {
res.header("Access-Control-Allow-Origin", "*");
res.header("Access-Control-Allow-Headers", "Content-Type,Content-Length, Authorization, Accept,X-Requested-With");
res.header("Access-Control-Allow-Methods","PUT,POST,GET,DELETE,OPTIONS");
res.header("X-Powered-By",' 3.2.1')
if(req.method=="OPTIONS") res.send(200);/*让options请求快速返回*/
else next();
});
function checkExistsIndex(dateList) {
var promises=[];
for(var date of dateList){
promises.push(new Promise((res,reject)=>{
superagent.head(config.es+date)
.then(
r=>{
res(true)
},
error=>{
res(false)
}
)
}))
}
return Promise.all(promises).then(data=>{
var existDateList=[];
for(var i=0;i<dateList.length;i++)
{
if(data[i]){
existDateList.push(dateList[i])
}
}
return existDateList
})
}
function formartTrace(list){
//todo:检测数据是否闭合(判断<和>的数量是否一致)
let zIndex=0;
let _list = [];
function pushItem(item,isStart){
let _arrary=_list;
//找到该层级的最后一个元素往里插
for(var i=0;i<zIndex;i++){
_arrary = _arrary[_arrary.length-1].children;
}
//方法开始
if(isStart){
_arrary.push({
method: item.method,
appName: item.appName,
start_time: item.time,
zIndex: zIndex,
children:[]
});
}
//方法结束
else
{
//找到一个没结束的item
for(var f=0;f<_arrary.length;f++){
if(!_arrary[f].end_time){
_arrary[f].end_time = item.time;
break
}
}
}
}
for(var i=0;i<list.length;i++){
//如果postion是 '<' 说明是上一个方法的子方法
if(list[i]['position']=='<'){
pushItem(list[i],true)
zIndex++;
}
else if(list[i]['position']=='>')
{
zIndex--;
pushItem(list[i],false)
}
}
return _list;
}
app.post('/getInfo', function (req, res) {
var str='';
req.on("data",function(dt){
str+=dt
})
checkExistsIndex(req.query.index.split(',')).then(existIndex=>{
if(existIndex.length==0){
res.send({
hits:{
hits:[],
total:0
}
})
return;
}
let url = config.es+existIndex+'/_search?track_total_hits=true&from='+(req.query.from || 0)+'&size='+(req.query.size || 30);
superagent
.post(url)
.set('Accept', 'application/json')
.send(str)
.timeout(10000)
.end(function (err, response) {
if(err){
res.send({
hits:{
hits:[],
total:0
}
})
}
else
{
res.send(response.text);
}
})
})
});
app.get('/getTrace', function (req, res) {
//console.log('traceId:'+req.query.traceId)
if(req.query.traceId){
let filter = {
"query": {
"bool": {
"must": [{
"match": {
"traceId": {
"query": req.query.traceId
}
}
}]
}
},
"sort": [{
"time":"asc",
"positionNum": "asc"
}]
};
let url = config.es+'/_search?size=500';
superagent
.post(url)
.set('Accept', 'application/json')
.send(JSON.stringify(filter))
.timeout(20000)
.end(function (err, response) {
if(err){
res.send({});
}
else
{
let hits = [];
try{
var result = JSON.parse(response.text);
result.hits.hits.map(hit=>{
hits.push(hit._source)
})
}
catch(e){
console.log('get hits error,'+e.message)
res.send({});
}
if(hits.length>0)
{
res.send(formartTrace(hits));
}
else
{
res.send({});
}
}
})
}
else
{
res.send({});
}
});
app.use('/', express.static('dist'));
app.get('/', (req, res) => {
res.sendFile(path.join(__dirname, 'dist/index.html'))
})
app.listen(config.port, function () {
console.log('Server listening on port '+config.port+'!');
});
到这里结束了,大家试试吧!
下一篇文章,咱们讲讲 多个系统之间调用这个日志系统框架,traceId只存在单个服务的问题。