filebeat-5.6.1 + logstash-5.6.2 + elasticsearch2.4 + spring-boot生成报表

鲁涵映
2023-12-01

日志格式
[INFO ] 2017-10-14 19:16:31 --> MyLog:/user/heart|{"appversion":"1.76","data":"{\"A\":\"\"}","deviceModel":"amodel",
"deviceVsn":"AiUI 1.0-1788 for amodel-dev","iP":"192.168.1.1","kernelVsn":
"3.10.0\sdfsdfsdsdfdsf@12333 #772\nTue Aug 9 14:36:31 CST 2016","operate":2,
"providerSoftVsn":"111111_ANDROID4.4.4-SDK_V1.0.0","usbInfoList":[],"uuid":"d7a9ae37-ddd123bb63-e1b49ed86f91","windowId":"26"}|1

filebeat配置
###################### Filebeat Configuration Example #########################

# This file is an example configuration file highlighting only the most common
# options. The filebeat.full.yml file from the same directory contains all the
# supported options with more comments. You can use it as a reference.
#
# You can find the full configuration reference here:
# https://www.elastic.co/guide/en/beats/filebeat/index.html

#=========================== Filebeat prospectors =============================

filebeat.prospectors:

# Each - is a prospector. Most options can be set at the prospector level, so
# you can use different prospectors for various configurations.
# Below are the prospector specific configurations.

- input_type: log

  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - /usr/local/sbin/software/xxx/logs/web/info*.log
    #- c:\programdata\elasticsearch\logs\*

  # Exclude lines. A list of regular expressions to match. It drops the lines that are
  # matching any regular expression from the list.
  exclude_lines: ["(MyLog:?getChannlesUrl)|(MyLog:?list)|(MyLog:?uploadlog)"]

  # Include lines. A list of regular expressions to match. It exports the lines that are
  # matching any regular expression from the list.
  #include_lines: ["^ERR", "^WARN"]

  # Exclude files. A list of regular expressions to match. Filebeat drops the files that
  # are matching any regular expression from the list. By default, no files are dropped.
  #exclude_files: [".gz$"]

  # Optional additional fields. These field can be freely picked
  # to add additional information to the crawled log files for filtering
  #fields:
  #  level: debug
  #  review: 1

  ### Multiline options

  # Mutiline can be used for log messages spanning multiple lines. This is common
  # for Java Stack Traces or C-Line Continuation

  # The regexp Pattern that has to be matched. The example pattern matches all lines starting with [
  #multiline.pattern: ^\[

  # Defines if the pattern set under pattern should be negated or not. Default is false.
  #multiline.negate: false

  # Match can be set to "after" or "before". It is used to define if lines should be append to a pattern
  # that was (not) matched before or after or as long as a pattern is not matched based on negate.
  # Note: After is the equivalent to previous and before is the equivalent to to next in Logstash
  #multiline.match: after


#================================ General =====================================

# The name of the shipper that publishes the network data. It can be used to group
# all the transactions sent by a single shipper in the web interface.
#name:

# The tags of the shipper are included in their own field with each
# transaction published.
#tags: ["service-X", "web-tier"]

# Optional fields that you can specify to add additional information to the
# output.
fields:
  log_type: mylog

#================================ Outputs =====================================

# Configure what outputs to use when sending the data collected by the beat.
# Multiple outputs may be used.

#-------------------------- Elasticsearch output ------------------------------
#output.elasticsearch:
  # Array of hosts to connect to.
  # hosts: ["localhost:9200"]

  # Optional protocol and basic auth credentials.
  #protocol: "https"
  #username: "elastic"
  #password: "changeme"

#----------------------------- Logstash output --------------------------------
output.logstash:
  # The Logstash hosts
  hosts: ["192.168.30.248:15144"]

  # Optional SSL. By default is off.
  # List of root certificates for HTTPS server verifications
  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

  # Certificate for SSL client authentication
  #ssl.certificate: "/etc/pki/client/cert.pem"

  # Client Certificate Key
  #ssl.key: "/etc/pki/client/cert.key"

#================================ Logging =====================================

# Sets log level. The default log level is info.
# Available log levels are: critical, error, warning, info, debug
#logging.level: debug

# At debug level, you can selectively enable logging only for some components.
# To enable all selectors use ["*"]. Examples of other selectors are "beat",
# "publish", "service".
#logging.selectors: ["*"]

logstash[./etc/mylogstash .conf]配置
input {
    beats {
        port => 15144
    }
}

filter {
	if [fields][log_type] == "mylog" {
		grok {
			patterns_dir => "/lunzn/software/report/my-report/logstash-5.6.2/etc/operate-pattern"
			match=>{"message"=>'\[INFO \] %{STR:logdate} --> MyLog:?%{STR:interfaceName}\|%{STR:datajson}\|'}
		}
		json {
			source => "datajson"
                        #target => "doc"
                        remove_field => ["datajson","message","source","@version","input_type","beat","host"]
		}
		json {
			source => "data"
                        #target => "doc"
			remove_field => ["data"]
		}
		ruby {
                        code => "event.set('datatype', event.get('type')); event.set('type', 'log')"
                } 
	}
        ruby {
                code => "event.set('timestamp', 
                event.get('@timestamp').time.localtime + 8*60*60); event.set('@timestamp', event.get('timestamp'))" 
		remove_field => ["@timestamp"]
        }
}

output {
	if [fields][log_type] == "mylog" {
		elasticsearch {
                    hosts => ["192.168.30.248:9200"]
                    index => "my-log"
                    manage_template => true
                    flush_size => 50000
                    idle_flush_time => 10
		}
	}
	if [fields][log_type] == "operate" {
		elasticsearch {
                    hosts => ["192.168.30.248:9200"]
                    index => "my-operate-log"
                    manage_template => true
                    flush_size => 50000
                    idle_flush_time => 10
		}
	}
}







logstash[./etc/operate-pattern]配置
STR (.*?)

相关命令启动
>>启动elasticsearch:
#进入elasticsearch的bin目录
./elasticsearch -d

>>启动logstash:
#进入logstash目录
#启动(后台运行)logstash
nohup ./bin/logstash -f ./etc/xiaoyu.conf &

>>启动filebeat:
#进入filebeat目录
nohup ./filebeat -e -c filebeat.yml &

spring-boot [pom.xml]
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>cn.my.report</groupId>
	<artifactId>cn.my.report</artifactId>
	<packaging>war</packaging>
	<version>1.0</version>
	<name>cn.my.report</name>

	<properties>
		<spring.boot.version>1.5.7.RELEASE</spring.boot.version>
	</properties>

	<dependencies>

		<!-- spring-boot -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-dependencies</artifactId>
			<version>${spring.boot.version}</version>
			<type>pom</type>
			<scope>import</scope>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
			<version>${spring.boot.version}</version>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
			<version>${spring.boot.version}</version>
		</dependency>

		<!-- 邮箱 -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-mail</artifactId>
			<version>${spring.boot.version}</version>
		</dependency>

		<!-- mybatis spring-boot -->
		<dependency>
			<groupId>org.mybatis.spring.boot</groupId>
			<artifactId>mybatis-spring-boot-starter</artifactId>
			<version>1.1.1</version>
		</dependency>

		<!-- mysql database driver -->
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>5.1.36</version>
		</dependency>

		<dependency>
			<groupId>org.elasticsearch</groupId>
			<artifactId>elasticsearch</artifactId>
			<version>2.4.3</version>
		</dependency>

		<!-- mongodb -->
		<!-- <dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-mongodb</artifactId>
			<version>${spring.boot.version}</version>
		</dependency> -->

		<!-- <dependency> <groupId>org.elasticsearch.plugin</groupId> <artifactId>delete-by-query</artifactId> 
			<version>2.4.3</version> </dependency> -->

		<dependency>
			<groupId>javax.servlet</groupId>
			<artifactId>javax.servlet-api</artifactId>
			<version>3.1.0</version>
			<scope>provided</scope>
		</dependency>

		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>1.2.7</version>
		</dependency>

		<dependency>
			<groupId>org.apache.poi</groupId>
			<artifactId>poi</artifactId>
			<version>3.17</version>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-war-plugin</artifactId>
				<version>2.6</version>
				<configuration>
					<warName>myreport</warName>
					<failOnMissingWebXml>false</failOnMissingWebXml>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>

spring-boot[application.properties]
server.port = 15050
server.context-path = /myreport
# ES
spring.data.elasticsearch.repositories.enabled = true
spring.data.elasticsearch.cluster-nodes = 192.168.30.248:9300
# 1800 = 30m   28800 = 8h
server.session-timeout = 1800
# server.error.path=/error
#spring.devtools.restart.enabled = false
#spring.devtools.restart.exclude = portal/**,theme/**
#spring.devtools.restart.additional-exclude
# xiaoyu database
spring.datasource.url=jdbc:mysql://192.168.30.253:3306/xxx?characterEncoding=utf-8&autoReconnect=true&failOverReadOnly=false&allowMultiQueries=true&useSSL=true
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
mybatis.mapper-locations=classpath*:mybatis/*.xml
#mybatis.type-aliases-package=org.springboot.sample.entity
# email
spring.mail.host=smtp.exmail.qq.com
spring.mail.username=my_report@xxx.com
#POP3/SMTP service authorization code
spring.mail.password=123456
spring.mail.default-encoding=UTF-8
#mongodb
#spring.data.mongodb.host=192.168.30.253
#spring.data.mongodb.port=27017
#spring.data.mongodb.database=my_report
#spring.http.encoding.charset=utf8
# 接口请求格式:application/json、application/xml、text/html
#spring.freemarker.content-type=application/json;charset=UTF-8


elasticsearch配置
## 建索引(建库)
curl -XPOST "http://192.168.30.248:9200/my-log"

## 设置命中查询最大返回数
curl -XPUT http://192.168.30.248:9200/my-log/_settings/ -d '{ "index" : { "max_result_window" : 2147483647}}' 

## 设置type(建表), "index" : "not_analyzed" 表示不分词,即字符串中含有特殊字符时,
## 如“aaa-bbb-ccc”,elasticsearch会将此字段的值分成"aaa","bbb","ccc",故而查询不到“aaa-bbb-ccc”数据,
## 不过在代码中也可以指定该字段不分词查询:
## QueryBuilders.matchPhraseQuery(“uuid”, "xxxxxxx-xxxx-xxxxx").slop(0)
curl -XPOST "http://192.168.30.248:9200/my-log/user/_mapping?pretty" -d'
{
	"user" : {
		"properties" : {
			"uuid" : {
				"type" : "string",
				"index" : "not_analyzed"
			},
			"company" : {
				"type" : "string",
				"index" : "not_analyzed"
			},
			"coversion" : {
				"type" : "string",
				"index" : "not_analyzed"
			},
			"appversion" : {
				"type" : "string",
				"index" : "not_analyzed"
			},
			"logdate" : {
				"format" : "yyyy-MM-dd HH:mm:ss",
				"type" : "date"
			},
			"ip" : {
				"type" : "string",
				"index" : "not_analyzed"
			}
		}
	}
}'

java es相关操作
    /**
     * ES-数据源
     */
    @Autowired
    private ElasticsearchTemplate esTemplate;
    public Long dayBoot(String statDate)
    {
        Client client = esTemplate.getClient();
        // CardinalityBuilder 聚合某个字段,去重复得到总数,比如数据:张三、李四、张三、张三
        // 使用后得到 2,precisionThreshold表示提高准确率,取值0-40000,超过按40000表示,慎用CardinalityBuilder 很难100%准确
        CardinalityBuilder agg =
            AggregationBuilders.cardinality("uuid").field(Field.UUID).precisionThreshold(Integer.MAX_VALUE);
        // 分组
        // TermsBuilder appversion = AggregationBuilders.terms("appversion");
        
        // 创建查询块
        BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
        
        // 仅查询当天 日志
        queryBuilder.must(QueryBuilders.rangeQuery("logdate")
            .format("yyyy-MM-dd HH:mm:ss")
            .gte(statDate + " 00:00:00")
            .lte(statDate + " 23:59:59"));
        
        // 前缀匹配
        queryBuilder.filter(QueryBuilders.prefixQuery(Field.INTERFACE_NAME, "/user/"));
        // 完全匹配 且 不分词
        // queryBuilder.filter(QueryBuilders.matchPhraseQuery(fieldname, fieldvalue).slop(0));
        
        //StringTerms aggAppversion = response.getAggregations().get("appversion");
        
        //for (Terms.Bucket appversion : aggAppversion.getBuckets())
        //{
        //   Cardinality agg = appversion.getAggregations().get("uuid");
        //   versionPV.put(appversion.getKeyAsString(), agg.getValue());
        //}
        SearchRequestBuilder search =
            client.prepareSearch(Constant.ES_MYLOG_INDEX).setTypes(Constant.ES_MYLOG_TYPE_USER).setSize(0);
        
        SearchResponse response = search.setQuery(queryBuilder).addAggregation(agg).get();
        
        Cardinality c = response.getAggregations().get("uuid");
        return c.getValue();
    }
    
    // 使用spring-boot集成查询删除方法
    DeleteQuery dq = new DeleteQuery();
    dq.setIndex(Constant.ES_MYLOG_INDEX);
    dq.setType(Constant.ES_MYLOG_TYPE_USER);
    dq.setQuery(QueryBuilders.matchAllQuery());
    esTemplate.delete(dq, User.class);
    // Report此类顶部必须设置:
    // @Document(indexName = Constant.ES_MYLOG_INDEX, type = "user", refreshInterval = "-1")
    // public class User
    
    // 使用原生查询删除方法,需要引用jar,此外还要ES安装批量删除插件,不然也无法删除
    // <dependency><groupId>org.elasticsearch.plugin</groupId><artifactId>delete-by-query</artifactId><version>2.4.3</version></dependency>
    Client client2 = TransportClient.builder().addPlugin(DeleteByQueryPlugin.class).build().addTransportAddress(
        new InetSocketTransportAddress(InetAddress.getByName("192.168.30.248"), 9300));
    DeleteByQueryResponse response1 = new DeleteByQueryRequestBuilder(client2, DeleteByQueryAction.INSTANCE)
            .setIndices(Constant.ES_INDEX)
            .setTypes(Constant.ES_TYPE_USER)
            .setQuery(QueryBuilders.matchAllQuery())
            .get();}
    /** 
     * 清理三个月以前的数据
     * @see [类、类#方法、类#成员]
     */
    public void clearThreeMonthsAgoLog()
    {
        // 三月之前
        Calendar threeMonthsAgoDate = Calendar.getInstance();
        threeMonthsAgoDate.add(Calendar.MONTH, -3);
        String threeMonthsAgo = DateUtil.formatDateToString(DateUtil.DATE_FORMAT_DAY_BAR, threeMonthsAgoDate.getTime());
        
        DeleteQuery dq = new DeleteQuery();
        dq.setIndex(Constant.ES_MYLOG_INDEX);
        dq.setType(Constant.ES_MYLOG_TYPE_UESR);
        
        BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
        queryBuilder.must(QueryBuilders.rangeQuery(Field.LOG_DATE)
            .format(DateUtil.DATE_FORMAT_SECOND_BAR)
            .lte(DateUtil.endDate(threeMonthsAgo)));
        
        dq.setQuery(queryBuilder);
        
        // 标记删除
        esTemplate.delete(dq, User.class);
        
        try
        {
            // 标记删除之后,强制删除标记删除的文件
            esTemplate.getClient()
                .admin()
                .indices()
                .forceMerge(new ForceMergeRequest(Constant.ES_MYLOG_INDEX).onlyExpungeDeletes(true))
                .get();
            
            logger.info("[Scheduled] Clear 3 month ago logs success...");
        }
        catch (Exception ee)
        {
            logger.error("[Scheduled] Clear 3 month ago logs failed...");
        }
    }

常用ES DDL语句,直接在linux中运行,es提供的wagnye查询太垃圾
curl -XGET "http://192.168.30.253:9200/my-log/user/_search" -d'
{
  "size" : 0,
  "query" : {
    "bool" : {
      "must" : {
        "range" : {
          "logdate" : {
            "from" : null,
            "to" : "2017-10-15 23:59:59",
            "format" : "yyyy-MM-dd HH:mm:ss",
            "include_lower" : true,
            "include_upper" : true
          }
        }
      }
    }
  },
  "aggregations" : {
    "appversion" : {
      "terms" : {
        "field" : "appversion",
        "size" : 2147483647
      },
      "aggregations" : {
        "uuid" : {
          "cardinality" : {
            "field" : "uuid",
            "precision_threshold" : 2147483647
          }
        }
      }
    }
  }
}'

curl -XDELETE "http://192.168.30.248:9200/my-log/user/_query" -d'
{
"query": {
"match_all": {}
}
}'
curl -XGET "http://192.168.30.253:9200/my-log/log/_search" -d'
{
  "size" : 0,
  "query" : {
    "bool" : {
      "must" : {
        "range" : {
          "logdate" : {
            "from" : "2017-10-15 00:00:00",
            "to" : "2017-10-15 23:59:59",
            "format" : "yyyy-MM-dd HH:mm:ss",
            "include_lower" : true,
            "include_upper" : true
          }
        }
      },
      "filter" : [ {
        "match" : {
          "interfaceName" : {
            "query" : "/user/log_report",
            "type" : "phrase",
            "slop" : 0
          }
        }
      }, {
        "match" : {
          "opCommandType" : {
            "query" : "1",
            "type" : "phrase",
            "slop" : 0
          }
        }
      } ]
    }
  },
  "aggregations" : {
    "uuid" : {
      "cardinality" : {
        "field" : "uuid",
        "precision_threshold" : 2147483647
      }
    }
  }
}'


 类似资料: