当前位置: 首页 > 工具软件 > Plumelog > 使用案例 >

Plumelog分布式日志框架改造(1)(查询限制10000条)

沙岳
2023-12-01

1. 概述

公司使用分布式项目,每天日志量太大,需要用到轻量级的分布式日志系统 PlumeLog。

 但是发现该框架有个问题:

        ES查询结果有最大10000条的限制。(一天生成一个key,几个单体服务的日志都放在ES的同一个key中,所以一天的日志量远远超过了10000条)

2. 解决方案

思路:

  1. ES设置key值的时候,在setting中添加配置:max_result_window:InitConfig.ES_MAX_RESULT
  2. ES查询的时候,在参数中添加 track_total_hits=true 的参数
  3. 将设置key的配置改成手动可配的参数,默认赋值两千万条。 

3. 修改文件

ClientConfig.java中的修改

package com.plumelog.server.client;

import com.plumelog.core.client.AbstractClient;
import com.plumelog.core.client.AbstractServerClient;
import com.plumelog.core.constant.LogMessageConstant;
import com.plumelog.core.kafka.KafkaConsumerClient;
import com.plumelog.core.lucene.LuceneClient;
import com.plumelog.core.redis.RedisClient;
import com.plumelog.core.redis.RedisClusterClient;
import com.plumelog.core.redis.RedisSentinelClient;
import com.plumelog.server.config.CollectStartBean;
import com.plumelog.server.config.InitConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.StringUtils;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
import java.time.ZoneId;

/**
 * className:RedisClientConfig
 * description: TODO
 * time:2020-07-02.15:51
 *
 * @author Tank
 * @version 1.0.0
 */
@Configuration
@Order(1)
@EnableWebSocket
public class ClientConfig implements InitializingBean {
    private static final org.slf4j.Logger logger = LoggerFactory.getLogger(CollectStartBean.class);
    @Value("${plumelog.maxSendSize:5000}")
    public int maxSendSize = 5000;
    @Value("${plumelog.interval:100}")
    public int interval = 100;
    @Value("${plumelog.kafka.kafkaGroupName:logConsumer}")
    public String kafkaGroupName = "logConsumer";

    @Value("${plumelog.model:redis}")
    private String model;
    @Value("${plumelog.kafka.kafkaHosts:}")


    private String kafkaHosts;
    /**
     * 支持携带协议,如:http、https
     */
    @Value("${plumelog.es.esHosts:}")
    private String esHosts;
    /**
     * 信任自签证书
     * <p>默认:true
     */
    @Value("${plumelog.es.trustSelfSigned:true}")
    private boolean trustSelfSigned = true;
    /**
     * hostname验证
     * <p>默认:false
     */
    @Value("${plumelog.es.hostnameVerification:false}")
    private boolean hostnameVerification = false;
    @Value("${plumelog.es.indexType:}")
    private String indexType;

    @Value("${plumelog.es.userName:}")
    private String esUserName;
    @Value("${plumelog.es.passWord:}")
    private String esPassWord;
    @Value("${plumelog.es.shards:5}")
    private int shards;
    @Value("${plumelog.es.replicas:0}")
    private int replicas;
    @Value("${plumelog.es.maxShards:100000}")
    private Long maxShards;
    @Value("${plumelog.es.maxResult:2000000}")
    private Long maxResult;

    @Value("${plumelog.es.refresh.interval:10s}")
    private String refreshInterval;
    @Value("${plumelog.es.indexType.model:day}")
    private String indexTypeModel;
    @Value("${plumelog.es.indexType.zoneId:GMT+8}")
    private String indexTypeZoneId;


    @Value("${plumelog.redis.redisHost:}")
    private String redisHost;
    @Value("${plumelog.redis.redisPassWord:}")
    private String redisPassWord;
    @Value("${plumelog.redis.redisDb:0}")
    private int redisDb = 0;
    @Value("${plumelog.redis.sentinel.masterName:}")
    private String redisSentinelMasterName;

    @Value("${plumelog.rest.restUrl:}")
    private String restUrl;
    @Value("${plumelog.rest.restUserName:}")
    private String restUserName;
    @Value("${plumelog.rest.restPassWord:}")
    private String restPassWord;
    @Value("${login.username:}")


    private String loginUsername;
    @Value("${login.password:}")
    private String loginPassword;

    @Value("${plumelog.queue.redis.redisHost:}")
    private String queueRedisHost;
    @Value("${plumelog.queue.redis.sentinel.masterName:}")
    private String queueRedisSentinelMasterName;
    @Value("${plumelog.queue.redis.redisPassWord:}")
    private String queueRedisPassWord;
    @Value("${plumelog.queue.redis.redisDb:0}")
    private int queueRedisDb = 0;

    @Value("${admin.log.keepDays:0}")
    private int keepDays;
    @Value("${admin.log.trace.keepDays:0}")
    private int traceKeepDays;

    @Value("${plumelog.local.path:}")
    private String liteLogPath;

    @Value("${plumelog.inside.redis.host:}")
    private String insideRedis;

    @Bean(name = "redisClient")
    public AbstractClient initRedisClient() {
        if (InitConfig.LITE_MODE_NAME.equals(this.model)) {
            return null;
        }
        String mgRedisHost = "";
        String mgRedisPassWord = "";
        String mgMasterName = "";
        int mgRedisDb = 0;
        if (!StringUtils.isEmpty(this.redisHost)) {
            mgRedisHost = this.redisHost;
            mgRedisPassWord = this.redisPassWord;
            mgRedisDb = this.redisDb;
            mgMasterName = this.redisSentinelMasterName;
        } else {
            mgRedisHost = this.queueRedisHost;
            mgRedisPassWord = this.queueRedisPassWord;
            mgRedisDb = this.queueRedisDb;
            mgMasterName = this.queueRedisSentinelMasterName;
        }
        if (!StringUtils.isEmpty(insideRedis)&&StringUtils.isEmpty(mgRedisHost)) {
            mgRedisHost=this.insideRedis;
        }
        logger.info("管理 redis host:{}", mgRedisHost);
        if (!StringUtils.isEmpty(mgRedisHost)) {
            if (!StringUtils.isEmpty(mgMasterName)) {
                return new RedisSentinelClient(mgRedisHost, mgMasterName, mgRedisPassWord, mgRedisDb);
            }
            if (mgRedisHost.split(",").length > 1) {
                return new RedisClusterClient(mgRedisHost, mgRedisPassWord);
            } else {
                String[] hs = mgRedisHost.split(":");
                int port = 6379;
                String ip = "127.0.0.1";
                if (hs.length == 2) {
                    ip = hs[0];
                    port = Integer.parseInt(hs[1]);
                } else {
                    logger.error("redis config error! please check the application.properties(plumelog.queue.redis.redisHost) ");
                    return null;
                }
                return new RedisClient(ip, port, mgRedisPassWord, mgRedisDb);
            }
        }
        logger.error("找不到redis配置项!请检查配置!");
        return null;
    }

    @Bean(name = "redisQueueClient")
    public AbstractClient initRedisQueueClient() {
        if (InitConfig.LITE_MODE_NAME.equals(this.model)) {
            return null;
        }
        String mgRedisHost = "";
        String mgRedisPassWord = "";
        String mgMasterName = "";
        int mgRedisDb = 0;

        mgRedisHost = this.queueRedisHost;
        mgRedisPassWord = this.queueRedisPassWord;
        mgRedisDb = this.queueRedisDb;
        mgMasterName = this.queueRedisSentinelMasterName;
        if (!StringUtils.isEmpty(insideRedis)&&StringUtils.isEmpty(mgRedisHost)) {
            mgRedisHost=this.insideRedis;
        }
        logger.info("队列 redis host:{}", mgRedisHost);
        if (!StringUtils.isEmpty(mgRedisHost)) {
            if (!StringUtils.isEmpty(mgMasterName)) {
                return new RedisSentinelClient(mgRedisHost, mgMasterName, mgRedisPassWord, mgRedisDb);
            }
            if (mgRedisHost.split(",").length > 1) {
                return new RedisClusterClient(mgRedisHost, mgRedisPassWord);
            } else {
                String[] hs = mgRedisHost.split(":");
                int port = 6379;
                String ip = "127.0.0.1";
                if (hs.length == 2) {
                    ip = hs[0];
                    port = Integer.parseInt(hs[1]);
                } else {
                    logger.error("redis config error! please check the application.properties(plumelog.queue.redis.redisHost) ");
                    return null;
                }
                return new RedisClient(ip, port, mgRedisPassWord, mgRedisDb);
            }
        }
        logger.error("找不到redis配置项!请检查配置!");
        return null;
    }

    @Bean
    public AbstractServerClient initAbstractServerClient() {
        if(InitConfig.LITE_MODE_NAME.equals(model)){
            logger.info("当前日志将存储在本地!");
            return new LuceneClient(InitConfig.LITE_MODE_LOG_PATH);
        }
        if (StringUtils.isEmpty(esHosts)) {
            logger.error("can not find esHosts config ! please check the application.properties(plumelog.es.esHosts) ");
            return null;
        }
        ElasticLowerClient elasticLowerClient = ElasticLowerClient.getInstance(esHosts, esUserName, esPassWord, trustSelfSigned, hostnameVerification);
        String esVersion = elasticLowerClient.getVersion();
        logger.info("es 初始化成功!Elastic 版本:{}", esVersion);
        if (esVersion != null && Integer.parseInt(esVersion.split("\\.")[0]) < 7) {
            InitConfig.esVersion=Integer.parseInt(esVersion.split("\\.")[0]);
            logger.info("set index type=plumelog");
            this.indexType = "plumelog";
            LogMessageConstant.ES_TYPE= "plumelog";
        }
        return elasticLowerClient;
    }

    @Bean
    public KafkaConsumer initKafkaConsumer() {
        if (InitConfig.KAFKA_MODE_NAME.equals(model)) {
            if (StringUtils.isEmpty(kafkaHosts)) {
                logger.error("can not find kafkaHosts config! please check the application.properties(plumelog.kafka.kafkaHosts) ");
                return null;
            }
            return KafkaConsumerClient.getInstance(kafkaHosts, InitConfig.KAFKA_GROUP_NAME, InitConfig.MAX_SEND_SIZE).getKafkaConsumer();
        }
        return null;
    }

    /**
     * 加载配置
     */
    private void loadConfig() {
        InitConfig.MAX_SEND_SIZE = this.maxSendSize;
        InitConfig.KAFKA_GROUP_NAME = this.kafkaGroupName;
        InitConfig.MAX_INTERVAL = this.interval;
        InitConfig.START_MODEL = this.model;

        InitConfig.ES_INDEX_SHARDS = this.shards;
        InitConfig.ES_INDEX_REPLICAS = this.replicas;
        InitConfig.ES_REFRESH_INTERVAL = this.refreshInterval;
        InitConfig.ES_MAX_RESULT = this.maxResult;
        InitConfig.ES_INDEX_MODEL = this.indexTypeModel;
        if(this.liteLogPath!=null&&!"".equals(this.liteLogPath)){
            InitConfig.LITE_MODE_LOG_PATH=this.liteLogPath;
        }else {
            InitConfig.LITE_MODE_LOG_PATH=System.getProperty("user.dir");
        }

        try {
            ZoneId.of(this.indexTypeZoneId);
            InitConfig.ES_INDEX_ZONE_ID = this.indexTypeZoneId;
        } catch (Exception e) {
            logger.error("Please check config 'plumelog.es.indexType.zoneId', the value '{}' is invalid, use default value '{}'!",
                    this.indexTypeZoneId, InitConfig.ES_INDEX_ZONE_ID);
        }

        InitConfig.restUrl = this.restUrl;
        InitConfig.restUserName = this.restUserName;
        InitConfig.restPassWord = this.restPassWord;

        LogMessageConstant.ES_TYPE = this.indexType;

        InitConfig.loginUsername = this.loginUsername;
        InitConfig.loginPassword = this.loginPassword;

        InitConfig.keepDays = this.keepDays;
        InitConfig.traceKeepDays = this.traceKeepDays;
        InitConfig.maxShards=this.maxShards;

        logger.info("server run model:" + this.model);
        logger.info("maxSendSize:" + this.maxSendSize);
        logger.info("interval:" + this.interval);
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        try {
            loadConfig();
            logger.info("load config success!");
        } catch (Exception e) {
            logger.error("plumelog load config success failed!", e);
        }
    }
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
    @Bean
    public TaskScheduler taskScheduler(){
        ThreadPoolTaskScheduler taskScheduler=new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(10);
        taskScheduler.initialize();;
        return taskScheduler;
    }
}

 ElasticLowerClient.java修改

package com.plumelog.server.client;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.plumelog.core.client.AbstractServerClient;
import com.plumelog.core.constant.LogMessageConstant;
import com.plumelog.core.util.GfJsonUtil;
import com.plumelog.core.util.ThreadPoolUtil;
import com.plumelog.server.config.InitConfig;
import com.plumelog.server.client.http.SkipHostnameVerifier;
import com.plumelog.server.client.http.SkipSslVerificationHttpRequestFactory;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.*;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;

import javax.net.ssl.SSLContext;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * className:ElasticLowerClient
 * description:ElasticLowerClient
 *
 * @author Frank.chen
 * @version 1.0.0
 */
public class ElasticLowerClient extends AbstractServerClient {

    private static final String opensearch = "opensearch";
    private static final String distribution = "distribution";

    private static ElasticLowerClient instance;
    private static final ThreadPoolExecutor threadPoolExecutor = ThreadPoolUtil.getPool(5, 5, 100);
    private final org.slf4j.Logger logger = LoggerFactory.getLogger(ElasticLowerClient.class);
    private RestClient client;

    /**
     * 带密码认证的
     *
     * @param hosts
     * @param userName
     * @param passWord
     */
    public ElasticLowerClient(String hosts, String userName, String passWord, boolean trustSelfSigned, boolean hostnameVerification) {
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, passWord));  //es账号密码
        String[] hostsAndPorts = hosts.split(",");
        HttpHost[] httpHosts = new HttpHost[hostsAndPorts.length];
        for (int i = 0; i < hostsAndPorts.length; i++) {
            String hostAndPort = hostsAndPorts[i].trim();
            if (!StringUtils.isEmpty(hostAndPort)) {
                httpHosts[i] = HttpHost.create(hostAndPort);
            }
        }
        client = RestClient.builder(httpHosts).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
            @Override
            public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                httpClientBuilder.disableAuthCaching();
                httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);

                // 设置ssl配置
                if (trustSelfSigned && !hostnameVerification) {
                    httpClientBuilder.setSSLContext(SkipSslVerificationHttpRequestFactory.getSSLContext());
                    httpClientBuilder.setSSLHostnameVerifier(new SkipHostnameVerifier());
                } else if (trustSelfSigned) {
                    httpClientBuilder.setSSLContext(SkipSslVerificationHttpRequestFactory.getSSLContext());
                }

                return httpClientBuilder;
            }
        }).build();
    }

    /**
     * 带ssl认证的
     *
     * @param hosts
     * @param keyStorePass
     * @param sslFile
     * @param keyStoreName
     */
    public ElasticLowerClient(String hosts, String keyStorePass, String sslFile, String keyStoreName) {

        try {
            Path keyStorePath = Paths.get(sslFile);
            KeyStore truststore = KeyStore.getInstance(keyStoreName);
            try (InputStream is = Files.newInputStream(keyStorePath)) {
                truststore.load(is, keyStorePass.toCharArray());
            }
            SSLContextBuilder sslBuilder = SSLContexts.custom().loadTrustMaterial(truststore, null);
            final SSLContext sslContext = sslBuilder.build();
            String[] hostsAndPorts = hosts.split(",");
            HttpHost[] httpHosts = new HttpHost[hostsAndPorts.length];
            for (int i = 0; i < hostsAndPorts.length; i++) {
                String hostAndPort = hostsAndPorts[i].trim();
                if (!StringUtils.isEmpty(hostAndPort)) {
                    httpHosts[i] = HttpHost.create(hostAndPort);
                }
            }
            client = RestClient.builder(httpHosts).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                @Override
                public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                    httpClientBuilder.disableAuthCaching();
                    return httpClientBuilder.setSSLContext(sslContext);
                }
            }).build();
        } catch (Exception e) {
            logger.error("ElasticSearch init fail!", e);
        }
    }

    public static ElasticLowerClient getInstance(String hosts, String userName, String passWord, boolean trustSelfSigned, boolean hostnameVerification) {
        if (instance == null) {
            synchronized (ElasticLowerClient.class) {
                if (instance == null) {
                    instance = new ElasticLowerClient(hosts, userName, passWord, trustSelfSigned, hostnameVerification);
                }
            }
        }
        return instance;
    }

    @Override
    public boolean existIndice(String indice) {
        try {
            Request request = new Request(
                    "HEAD",
                    "/" + indice + "");
            Response res = client.performRequest(request);
            if (res.getStatusLine().getStatusCode() == 200) {
                return true;
            }
        } catch (Exception e) {
            logger.error("", e);
        }
        return false;
    }

    @Override
    public String getVersion() {
        try {
            Request request = new Request("GET", "/");
            Response res = client.performRequest(request);
            if (res.getStatusLine().getStatusCode() == 200) {
                String jsonStr = EntityUtils.toString(res.getEntity(), "utf-8");
                JSONObject jsonObject = JSON.parseObject(jsonStr);
                JSONObject version = jsonObject.getJSONObject("version");
                if (opensearch.equals(version.getString(distribution))) {
                    int lucene_version = Integer.parseInt(version.getString("lucene_version").split("\\.")[0]);
                    return String.valueOf(lucene_version - 1);
                }

                return version.getString("number");
            } else {
                String responseStr = EntityUtils.toString(res.getEntity());
                logger.error("ElasticSearch GET Failure! {}", responseStr);
            }
        } catch (Exception e) {
            logger.error("", e);
        }
        return null;
    }

    @Override
    public boolean creatIndice(String indice, String type) {
        try {
            Request request = new Request(
                    "PUT",
                    "/" + indice + "");
            String properties = "\"properties\":{\"appName\":{\"type\":\"keyword\"}," +
                    "\"env\":{\"type\":\"keyword\"}," +
                    "\"appNameWithEnv\":{\"type\":\"keyword\"}," +
                    "\"logLevel\":{\"type\":\"keyword\"}," +
                    "\"serverName\":{\"type\":\"keyword\"}," +
                    "\"traceId\":{\"type\":\"keyword\"}," +
                    "\"dtTime\":{\"type\":\"date\",\"format\":\"strict_date_optional_time||epoch_millis\"}," +
                    "\"seq\":{\"type\":\"long\"}" +
                    "}";
            String ent = "{\"settings\":{\"number_of_shards\":" + InitConfig.ES_INDEX_SHARDS +
                    ",\"number_of_replicas\":" + InitConfig.ES_INDEX_REPLICAS +
                    ",\"refresh_interval\":\"" + InitConfig.ES_REFRESH_INTERVAL +
                    "\",\"max_result_window\":" + InitConfig.ES_MAX_RESULT + "}";
            if (StringUtils.isEmpty(type)) {
                ent = ent + ",\"mappings\":{" + properties + "}}";
            } else {
                ent = ent + ",\"mappings\":{\"" + type + "\":{" + properties + "}}}";
            }

            request.setJsonEntity(ent);
            Response res = client.performRequest(request);
            if (res.getStatusLine().getStatusCode() == 200) {
                logger.info("create index {} success", indice);
                return true;
            } else {
                String responseStr = EntityUtils.toString(res.getEntity());
                logger.error("ElasticSearch PUT Failure! {}", responseStr);
            }
        } catch (Exception e) {
            logger.error("", e);
        }
        return false;
    }

    @Override
    public boolean setMapping(String indice, String type) {
        try {
            String url = "/" + indice + "/_mapping?pretty";
            if (!StringUtils.isEmpty(type)) {
                url = "/" + indice + "/" + type + "/_mapping?pretty";
            }
            Request request = new Request(
                    "PUT", url
            );
            String properties = "\"properties\":{\"appName\":{\"type\":\"keyword\"}," +
                    "\"env\":{\"type\":\"keyword\"}," +
                    "\"appNameWithEnv\":{\"type\":\"keyword\"}," +
                    "\"logLevel\":{\"type\":\"keyword\"}," +
                    "\"serverName\":{\"type\":\"keyword\"}," +
                    "\"traceId\":{\"type\":\"keyword\"}," +
                    "\"dtTime\":{\"type\":\"date\",\"format\":\"strict_date_optional_time||epoch_millis\"}," +
                    "\"seq\":{\"type\":\"long\"}" +
                    "}";
            String ent = "{\"" + type + "\":{" + properties + "}}";
            request.setJsonEntity(ent);
            Response res = client.performRequest(request);
            if (res.getStatusLine().getStatusCode() == 200) {
                logger.info("reset index  {} mapping success", indice);
                return true;
            } else {
                String responseStr = EntityUtils.toString(res.getEntity());
                logger.error("ElasticSearch PUT Failure! {}", responseStr);
            }
        } catch (Exception e) {
            logger.error("", e);
        }
        return false;
    }

    @Override
    public boolean creatIndiceTrace(String indice, String type) {
        try {
            Request request = new Request(
                    "PUT",
                    "/" + indice + "");
            String properties = "\"properties\":{\"appName\":{\"type\":\"keyword\"}," +
                    "\"env\":{\"type\":\"keyword\"}," +
                    "\"appNameWithEnv\":{\"type\":\"keyword\"}," +
                    "\"traceId\":{\"type\":\"keyword\"}" +
                    "}";
            String ent = "{\"settings\":{\"number_of_shards\":" + InitConfig.ES_INDEX_SHARDS +
                    ",\"number_of_replicas\":" + InitConfig.ES_INDEX_REPLICAS +
                    ",\"refresh_interval\":\"" + InitConfig.ES_REFRESH_INTERVAL +
                    "\",\"max_result_window\":" + InitConfig.ES_MAX_RESULT + "}";
            if (StringUtils.isEmpty(type)) {
                ent = ent + ",\"mappings\":{" + properties + "}}";
            } else {
                ent = ent + ",\"mappings\":{\"" + type + "\":{" + properties + "}}}";
            }
            request.setJsonEntity(ent);
            Response res = client.performRequest(request);
            if (res.getStatusLine().getStatusCode() == 200) {
                logger.info("create index {} success", indice);
                return true;
            } else {
                String responseStr = EntityUtils.toString(res.getEntity());
                logger.error("ElasticSearch PUT Failure! {}", responseStr);
            }
        } catch (Exception e) {
            logger.error("", e);
        }
        return false;
    }

    @Override
    public boolean creatIndiceNomal(String indice, String type) {
        try {
            Request request = new Request(
                    "PUT",
                    "/" + indice + "");
            String ent = "{\"settings\":{\"number_of_shards\":5,\"number_of_replicas\":0,\"refresh_interval\":\"10s\"," +
                    "\"max_result_window\":2000000000}}";
            request.setJsonEntity(ent);
            Response res = client.performRequest(request);
            if (res.getStatusLine().getStatusCode() == 200) {
                return true;
            } else {
                String responseStr = EntityUtils.toString(res.getEntity());
                logger.error("ElasticSearch PUT Failure! {}", responseStr);
            }
        } catch (Exception e) {
            logger.error("", e);
        }
        return false;
    }

    @Override
    public boolean addShards(Long shardCount) {
        try {
            Request request = new Request(
                    "PUT",
                    "/_cluster/settings");
            String ent = "{\"persistent\":{\"cluster\":{\"max_shards_per_node\":" + shardCount + "}}}";
            request.setJsonEntity(ent);
            Response res = client.performRequest(request);
            if (res.getStatusLine().getStatusCode() == 200) {
                return true;
            } else {
                String responseStr = EntityUtils.toString(res.getEntity());
                logger.error("ElasticSearch PUT Failure! {}", responseStr);
            }
        } catch (Exception e) {
            logger.error("", e);
        }
        return false;
    }

    @Override
    public void insertListLog(List<String> list, String baseIndex, String type) throws IOException {
        if (!existIndice(baseIndex)) {
            if (baseIndex.startsWith(LogMessageConstant.ES_INDEX)) {
                creatIndice(baseIndex, type);
            } else {
                creatIndiceNomal(baseIndex, type);
            }
        }
        insertListV1(list, baseIndex, type);
    }

    @Override
    public void insertListTrace(List<String> list, String baseIndex, String type) throws IOException {
        insertListV1(list, baseIndex, type);
    }

    @Override
    public void insertListComm(List<String> list, String baseIndex, String type) throws IOException {
        insertListV1(list, baseIndex, type);
    }

    private void insertList(List<String> list, String baseIndex, String type) throws IOException {
        StringBuffer sendStr = new StringBuffer();
        int size = list.size();
        for (int a = 0; a < size; a++) {
            String map = list.get(a);
            String ent = "{\"index\":{} ";
            sendStr.append(ent);
            sendStr.append("\r\n");
            sendStr.append(map);
            sendStr.append("\r\n");
        }
        list = null;
        String endpoint = "";
        if (StringUtils.isEmpty(type)) {
            endpoint = "/" + baseIndex + "/_bulk";
        } else {
            endpoint = "/" + baseIndex + "/" + type + "/_bulk";
        }
        Request request = new Request(
                "PUT",
                endpoint);
        request.setJsonEntity(sendStr.toString());
        client.performRequestAsync(request, new ResponseListener() {
            @Override
            public void onSuccess(Response response) {
                request.setEntity(null);
                try {

                    if (response.getStatusLine().getStatusCode() == 200) {
                        logger.info("ElasticSearch commit! success");
                    } else {
                        String responseStr = EntityUtils.toString(response.getEntity());
                        logger.error("ElasticSearch commit Failure! {}", responseStr);
                    }
                } catch (IOException e) {
                    logger.error("ElasticSearch commit Failure!", e);
                }
            }

            @Override
            public void onFailure(Exception e) {
                logger.error("ElasticSearch commit Failure!", e);
            }
        });
    }

    private void insertListV1(List<String> list, String baseIndex, String type) throws IOException {

        StringBuffer sendStr = new StringBuffer();
        int size = list.size();
        for (int a = 0; a < size; a++) {
            String map = list.get(a);
            String ent = "{\"index\":{} ";
            sendStr.append(ent);
            sendStr.append("\r\n");
            sendStr.append(map);
            sendStr.append("\r\n");
        }
        list = null;
        String endpoint = "";
        if (StringUtils.isEmpty(type)) {
            endpoint = "/" + baseIndex + "/_bulk";
        } else {
            endpoint = "/" + baseIndex + "/" + type + "/_bulk";
        }
        Request request = new Request(
                "PUT",
                endpoint);
        request.setJsonEntity(sendStr.toString());
        final Request requestStr = request;
        threadPoolExecutor.execute(() -> {
            try {

                long startTime = System.currentTimeMillis();
                Response response = client.performRequest(requestStr);
                long endTime = System.currentTimeMillis();
                requestStr.setEntity(null);
                if (response.getStatusLine().getStatusCode() == 200) {
                    logger.info("ElasticSearch commit! success,日志提交ES耗时:{}", endTime - startTime);
                } else {
                    String responseStr = EntityUtils.toString(response.getEntity());
                    logger.error("ElasticSearch commit Failure! {},日志提交ES耗时:{}", responseStr, endTime - startTime);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
    }

    @Override
    public String cat(String index) {
        String reStr = "";
        Request request = new Request(
                "GET",
                "/_cat/indices/" + index + "?v");
        try {
            Response res = client.performRequest(request);
            InputStream inputStream = res.getEntity().getContent();
            byte[] bytes = new byte[0];
            bytes = new byte[inputStream.available()];
            inputStream.read(bytes);
            String str = new String(bytes);
            reStr = str;
        } catch (Exception e) {
            e.printStackTrace();
            reStr = "";
        }

        BufferedReader br = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(reStr.getBytes(StandardCharsets.UTF_8)), StandardCharsets.UTF_8));
        List<String> list = new ArrayList<>();
        try {
            while (true) {
                String aa = br.readLine();
                if (StringUtils.isEmpty(aa)) {
                    break;
                }
                list.add(aa);
            }
            List<Map<String, String>> listMap = new ArrayList<>();
            if (list.size() > 0) {
                String[] title = list.get(0).split("\\s+");
                for (int i = 1; i < list.size(); i++) {
                    String[] values = list.get(i).split("\\s+");
                    Map<String, String> map = new HashMap<>();
                    for (int j = 0; j < title.length; j++) {
                        map.put(title[j], values[j]);
                    }
                    listMap.add(map);
                }
            }
            return GfJsonUtil.toJSONString(listMap);
        } catch (IOException e) {
            logger.error("", e);
        }
        return "";
    }

    @Override
    public String get(String url, String queryStr) throws Exception {
        StringEntity stringEntity = new StringEntity(queryStr, "utf-8");
        stringEntity.setContentType("application/json");
        Request request = new Request(
                "GET",
                url);
        request.setEntity(stringEntity);
        Response res = client.performRequest(request);
        if (res.getStatusLine().getStatusCode() == 200) {
            logger.info("ElasticSearch query success!");
        } else {
            String responseStr = EntityUtils.toString(res.getEntity());
            logger.error("ElasticSearch query Failure! {}", responseStr);
        }
        return EntityUtils.toString(res.getEntity(), "utf-8");
    }

    @Override
    public String get(String indexStr, String queryStr, String from, String size) throws Exception {
        String url = "/" + indexStr + "/_search?track_total_hits=true&from=" + from + "&size=" + size;
        StringEntity stringEntity = new StringEntity(queryStr, "utf-8");
        stringEntity.setContentType("application/json");
        Request request = new Request(
                "GET",
                url);
        request.setEntity(stringEntity);
        Response res = client.performRequest(request);
        if (res.getStatusLine().getStatusCode() == 200) {
            logger.info("ElasticSearch query success!");
        } else {
            String responseStr = EntityUtils.toString(res.getEntity());
            logger.error("ElasticSearch query Failure! {}", responseStr);
        }
        return EntityUtils.toString(res.getEntity(), "utf-8");
    }

    @Override
    public String group(String indexStr, String queryStr) throws Exception {
        String url = "/" + indexStr + "/_search";
        StringEntity stringEntity = new StringEntity(queryStr, "utf-8");
        stringEntity.setContentType("application/json");
        Request request = new Request(
                "GET",
                url);
        request.setEntity(stringEntity);
        Response res = client.performRequest(request);
        if (res.getStatusLine().getStatusCode() == 200) {
            logger.info("ElasticSearch query success!");
        } else {
            String responseStr = EntityUtils.toString(res.getEntity());
            logger.error("ElasticSearch query Failure! {}", responseStr);
        }
        return EntityUtils.toString(res.getEntity(), "utf-8");
    }

    @Override
    public List<String> getExistIndices(String[] indices) {
        List<String> existIndexList = new ArrayList<String>();
        for (String index : indices) {
            try {
                Request request = new Request(
                        "HEAD",
                        "/" + index + "");
                Response res = client.performRequest(request);
                if (res.getStatusLine().getStatusCode() == 200) {
                    existIndexList.add(index);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return existIndexList;
    }

    @Override
    public boolean deleteIndex(String index) {
        try {
            Request request = new Request(
                    "DELETE",
                    "/" + index + "");
            Response res = client.performRequest(request);
            if (res.getStatusLine().getStatusCode() == 200) {
                return true;
            }
        } catch (ResponseException e) {
            if (e.getResponse().getStatusLine().getStatusCode() != 404) {
                e.printStackTrace();
                return false;
            }
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
        return false;
    }

    @Override
    public void close() {
        try {
            client.close();
        } catch (IOException e) {
            logger.error("", e);
        }
    }

}

InitConfig.java 修改
package com.plumelog.server.config;


public class InitConfig {
    public final static String KAFKA_MODE_NAME = "kafka";
    public final static String REDIS_MODE_NAME = "redis";
    public final static String REST_MODE_NAME = "rest";
    public final static String LITE_MODE_NAME = "lite";
    public static String LITE_MODE_LOG_PATH = ".";
    public final static String REDIS_CLUSTER_MODE_NAME = "redisCluster";
    public final static String REDIS_SENTINEL_MODE_NAME = "redisSentinel";
    public static final long MILLS_ONE_DAY = 24 * 60 * 60 * 1000;
    //最大每次发送日志条数
    public static int MAX_SEND_SIZE = 5000;
    //日志抓取频次间隔时间
    public static int MAX_INTERVAL = 100;
    //kafka消费组名称
    public static String KAFKA_GROUP_NAME = "logConsumer";
    //kafka消费组名称
    public static String START_MODEL = "redis";
    public static int ES_INDEX_SHARDS = 5;
    public static int ES_INDEX_REPLICAS = 0;
    public static String ES_REFRESH_INTERVAL = "10s";
    public static String ES_INDEX_MODEL = "day";
    public static String ES_INDEX_ZONE_ID = "GMT+8";
    public static String restUserName = "";
    public static String restPassWord = "";
    public static String restUrl = "";
    public static String loginUsername = "";
    public static String loginPassword = "";
    public static int keepDays = 0;
    public static int traceKeepDays = 0;
    public static Long maxShards = 100000L;
    public static Long ES_MAX_RESULT = 200000000L;
    public static int esVersion = 7;
    public final static String WEB_CONSOLE_KEY="plumelog:web_console";
    public final static String WEB_CONSOLE_CHANNEL="plumelog_webConsole_Channel";
}

application.properties配置文件中添加:

plumelog.es.maxResult=200000000

前端中app.js的查询修改一下
 

var express = require('express');
var config = require("./src/config.json")
var app = express();
var bodyParser = require('body-parser');
app.use(bodyParser.urlencoded({extended : false}));
var superagent = require('superagent');
const path = require('path');

app.all('*', function(req, res, next) {
    res.header("Access-Control-Allow-Origin", "*");
    res.header("Access-Control-Allow-Headers", "Content-Type,Content-Length, Authorization, Accept,X-Requested-With");
    res.header("Access-Control-Allow-Methods","PUT,POST,GET,DELETE,OPTIONS");
    res.header("X-Powered-By",' 3.2.1')
    if(req.method=="OPTIONS") res.send(200);/*让options请求快速返回*/
    else  next();
});

function checkExistsIndex(dateList) {
    var promises=[];
  
    for(var date of dateList){
         promises.push(new Promise((res,reject)=>{
            superagent.head(config.es+date)
                    .then(
                        r=>{
                            res(true)
                        },
                        error=>{
                            res(false)
                        }
                    )
                    
         }))
    }

    return Promise.all(promises).then(data=>{
      var existDateList=[];
      for(var i=0;i<dateList.length;i++)
      {
        if(data[i]){
          existDateList.push(dateList[i])
        }
      }
      return existDateList
    })
   
}

function formartTrace(list){


  //todo:检测数据是否闭合(判断<和>的数量是否一致)

  let zIndex=0;
  let _list = [];

  function pushItem(item,isStart){

    let _arrary=_list;

    //找到该层级的最后一个元素往里插
    for(var i=0;i<zIndex;i++){
        _arrary = _arrary[_arrary.length-1].children;
    }
    
    //方法开始
    if(isStart){
      _arrary.push({
        method: item.method,
        appName: item.appName,
        start_time: item.time,
        zIndex: zIndex,
        children:[]
      });
    }
    //方法结束
    else
    {
      //找到一个没结束的item
      for(var f=0;f<_arrary.length;f++){
        if(!_arrary[f].end_time){
          _arrary[f].end_time = item.time;
          break
        }
      }
    }
  }

  for(var i=0;i<list.length;i++){
    //如果postion是 '<' 说明是上一个方法的子方法
    if(list[i]['position']=='<'){
      pushItem(list[i],true)
      zIndex++;

    }
    else if(list[i]['position']=='>')
    {
      zIndex--;
      pushItem(list[i],false)
    }
  }

  return _list;
}

app.post('/getInfo', function (req, res) {
    
    var str='';
    req.on("data",function(dt){
        str+=dt
    })

    checkExistsIndex(req.query.index.split(',')).then(existIndex=>{
  
        if(existIndex.length==0){
          res.send({
            hits:{
              hits:[],
              total:0
            }
          })
          return;
        }

        let url = config.es+existIndex+'/_search?track_total_hits=true&from='+(req.query.from || 0)+'&size='+(req.query.size || 30);
        superagent
          .post(url)
          .set('Accept', 'application/json')
          .send(str)
          .timeout(10000)
          .end(function (err, response) {
            if(err){
              res.send({
                hits:{
                  hits:[],
                  total:0
                }
              })
            }
            else
            {
              res.send(response.text);
            }
          })
    })
});

app.get('/getTrace', function (req, res) {

  //console.log('traceId:'+req.query.traceId)
  if(req.query.traceId){

    let filter = {
      "query": {
        "bool": {
          "must": [{
            "match": {
              "traceId": {
                "query": req.query.traceId
              }
            }
          }]
        }
      },
      "sort": [{
        "time":"asc",
        "positionNum": "asc"
      }]
    };

    let url = config.es+'/_search?size=500';
    superagent
      .post(url)
      .set('Accept', 'application/json')
      .send(JSON.stringify(filter))
      .timeout(20000)
      .end(function (err, response) {
          if(err){
            res.send({});
          }
          else
          {
            let hits = [];
          
            try{
              var result = JSON.parse(response.text);
              result.hits.hits.map(hit=>{
                hits.push(hit._source)
              })
            }
            catch(e){
              console.log('get hits error,'+e.message)
              res.send({});
            }

            if(hits.length>0)
            {
              res.send(formartTrace(hits));
            }
            else
            {
              res.send({});
            }
          }
    })
  }
  else
  {
    res.send({});
  }
});


app.use('/', express.static('dist'));

app.get('/', (req, res) => {
  res.sendFile(path.join(__dirname, 'dist/index.html'))
})


app.listen(config.port, function () {
  console.log('Server listening on port '+config.port+'!');
});

到这里结束了,大家试试吧!

下一篇文章,咱们讲讲 多个系统之间调用这个日志系统框架,traceId只存在单个服务的问题。

 类似资料: