当前位置: 首页 > 工具软件 > Elastic-Bg > 使用案例 >

elasticsearch - java - 快速开始

凌翔宇
2023-12-01

世界上并没有完美的程序,但是我们并不因此而沮丧,因为写程序就是一个不断追求完美的过程。
-侯氏工坊

参考

添加数据

添加单个文档

  • 原语句
POST logs-my_app-default/_doc
{
  "@timestamp": "2099-05-06T16:21:15.000Z",
  "event": {
    "original": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/bg.jpg HTTP/1.0\" 200 24736"
  }
}
  • 对应Java代码
private IndexResponse indexOne(String index, Object o) throws Exception {
    return elasticsearchClient
            .index(_0 -> _0
                    .index(index)
                    .document(o));
}

添加多个文档

  • 原语句
PUT logs-my_app-default/_bulk
{ "create": { } }
{ "@timestamp": "2099-05-07T16:24:32.000Z", "event": { "original": "192.0.2.242 - - [07/May/2020:16:24:32 -0500] \"GET /images/hm_nbg.jpg HTTP/1.0\" 304 0" } }
{ "create": { } }
{ "@timestamp": "2099-05-08T16:25:42.000Z", "event": { "original": "192.0.2.255 - - [08/May/2099:16:25:42 +0000] \"GET /favicon.ico HTTP/1.0\" 200 3638" } }
  • 对应Java代码
private BulkResponse indexMulti(String index, List<?> objs)
        throws Exception {
    return elasticsearchClient.bulk(_0 -> {
        _0.index(index);
        objs.forEach(obj -> _0
                .operations(_1 -> _1
                        .create(_2 -> _2
                                .document(obj))));
        return _0;
    });
}

搜索数据

  • 原语句
GET logs-my_app-default/_search
{
  "query": {
    "match_all": { }
  },
  "sort": [
    {
      "@timestamp": "desc"
    }
  ]
}
  • 对应Java代码
private <T> SearchResponse<T> search(String index, Class<T> c)
        throws Exception {
    return elasticsearchClient.search(_0 -> _0
                    .index(index)
                    .fields(_1 -> _1
                            .field("@timestamp"))
                    .fields(_1 -> _1
                            .field("event.original"))
                    .query(_1 -> _1
                            .matchAll(_2 -> _2
                                    .queryName("all")))
                    .sort(_1 -> _1
                            .field(_2 -> _2
                                    .field("@timestamp")
                                    .order(SortOrder.Desc)))
            , c);
}

获取特定字段

  • 原语句
GET logs-my_app-default/_search
{
  "query": {
    "match_all": { }
  },
  "fields": [
    "@timestamp"
  ],
  "_source": false,
  "sort": [
    {
      "@timestamp": "desc"
    }
  ]
}
  • 对应Java代码
private <T> SearchResponse<T> searchSpecialFields(String index, Class<T> c)
        throws Exception {
    return elasticsearchClient.search(_0 -> _0
                    .index(index)
                    .source(_1 -> _1
                            .fetch(false))
                    .fields(_1 -> _1
                            .field("@timestamp"))
                    .query(_1 -> _1
                            .matchAll(_2 -> _2
                                    .queryName("all")))
                    .sort(_1 -> _1
                            .field(_2 -> _2
                                    .field("@timestamp")
                                    .order(SortOrder.Desc)))
            , c);
}

搜索日期范围

  • 原语句
GET logs-my_app-default/_search
{
  "query": {
    "range": {
      "@timestamp": {
        "gte": "2099-05-05",
        "lt": "2099-05-08"
      }
    }
  },
  "fields": [
    "@timestamp"
  ],
  "_source": false,
  "sort": [
    {
      "@timestamp": "desc"
    }
  ]
}

GET logs-my_app-default/_search
{
  "query": {
    "range": {
      "@timestamp": {
        "gte": "now-1d/d",
        "lt": "now/d"
      }
    }
  },
  "fields": [
    "@timestamp"
  ],
  "_source": false,
  "sort": [
    {
      "@timestamp": "desc"
    }
  ]
}
  • 对应java代码
private <T> SearchResponse<T> searchDateRange(
        String index, String start, String end, Class<T> c
) throws Exception {
    return elasticsearchClient.search(_0 -> _0
                    .index(index)
                    .source(_1 -> _1
                            .fetch(false))
                    .fields(_1 -> _1
                            .field("@timestamp"))
                    .query(_1 -> _1
                            .range(_2 -> _2
                                    .field("@timestamp")
                                    .gte(JsonData.of(start))
                                    .lt(JsonData.of(end))))
                    .sort(_1 -> _1
                            .field(_2 -> _2
                                    .field("@timestamp")
                                    .order(SortOrder.Desc)))
            , c);
}

从非结构化数据中抽取字段

  • 原语句
GET logs-my_app-default/_search
{
  "runtime_mappings": {
    "source.ip": {
      "type": "ip",
      "script": """
        String sourceip=grok('%{IPORHOST:sourceip} .*').extract(doc[ "event.original" ].value)?.sourceip;
        if (sourceip != null) emit(sourceip);
      """
    }
  },
  "query": {
    "range": {
      "@timestamp": {
        "gte": "2099-05-05",
        "lt": "2099-05-08"
      }
    }
  },
  "fields": [
    "@timestamp",
    "source.ip"
  ],
  "_source": false,
  "sort": [
    {
      "@timestamp": "desc"
    }
  ]
}
  • 对应的Java代码
private <T> SearchResponse<T> searchExtract(
        String index, String script, String start, String end, Class<T> c
) throws Exception {
    return elasticsearchClient.search(_0 -> _0
                    .index(index)
                    .source(_1 -> _1
                            .fetch(false))
                    .fields(_1 -> _1
                            .field("@timestamp"))
                    .fields(_1 -> _1
                            .field("source.ip"))
                    .runtimeMappings("source.ip", _1 -> _1
                            .type(RuntimeFieldType.Ip)
                            .script(_2 -> _2
                                    .inline(_3 -> _3
                                            .source(script))))
                    .query(_1 -> _1
                            .range(_2 -> _2
                                    .field("@timestamp")
                                    .gte(JsonData.of(start))
                                    .lt(JsonData.of(end))))
                    .sort(_1 -> _1
                            .field(_2 -> _2
                                    .field("@timestamp")
                                    .order(SortOrder.Desc)))
            , c);
}
String script = "String sourceip=grok('%{IPORHOST:sourceip} .*')" +
                ".extract(doc[ \"event.original\" ].value)?.sourceip;\n" +
                "        if (sourceip != null) emit(sourceip);";

组合查询

  • 原语句
GET logs-my_app-default/_search
{
  "runtime_mappings": {
    "source.ip": {
      "type": "ip",
      "script": """
        String sourceip=grok('%{IPORHOST:sourceip} .*').extract(doc[ "event.original" ].value)?.sourceip;
        if (sourceip != null) emit(sourceip);
      """
    }
  },
  "query": {
    "bool": {
      "filter": [
        {
          "range": {
            "@timestamp": {
              "gte": "2099-05-05",
              "lt": "2099-05-08"
            }
          }
        },
        {
          "range": {
            "source.ip": {
              "gte": "192.0.2.0",
              "lte": "192.0.2.240"
            }
          }
        }
      ]
    }
  },
  "fields": [
    "@timestamp",
    "source.ip"
  ],
  "_source": false,
  "sort": [
    {
      "@timestamp": "desc"
    }
  ]
}
  • 对应的Java代码
private <T> SearchResponse<T> searchComb(
        String index, String script, String start, String end,
        String startIp, String endIp, Class<T> c
) throws Exception {
    return elasticsearchClient.search(_0 -> _0
                    .index(index)
                    .source(_1 -> _1
                            .fetch(false))
                    .fields(_1 -> _1
                            .field("@timestamp"))
                    .fields(_1 -> _1
                            .field("source.ip"))
                    .runtimeMappings("source.ip", _1 -> _1
                            .type(RuntimeFieldType.Ip)
                            .script(_2 -> _2
                                    .inline(_3 -> _3
                                            .source(script))))
                    .query(_1 -> _1
                            .bool(_2 -> _2
                                    .filter(_3 -> _3
                                            .range(_4 -> _4
                                                    .field("@timestamp")
                                                    .gte(JsonData.of(start))
                                                    .lt(JsonData.of(end))))
                                    .filter(_3 -> _3
                                            .range(_4 -> _4
                                                    .field("source.ip")
                                                    .gte(JsonData.of(startIp))
                                                    .lt(JsonData.of(endIp))))))
                    .sort(_1 -> _1
                            .field(_2 -> _2
                                    .field("@timestamp")
                                    .order(SortOrder.Desc)))
            , c);
}
String script = "String sourceip=grok('%{IPORHOST:sourceip} .*')" +
                ".extract(doc[ \"event.original\" ].value)?.sourceip;\n" +
                "        if (sourceip != null) emit(sourceip);";

聚合数据

  • 原语句
GET logs-my_app-default/_search
{
  "runtime_mappings": {
    "http.response.body.bytes": {
      "type": "long",
      "script": """
        String bytes=grok('%{COMMONAPACHELOG}').extract(doc[ "event.original" ].value)?.bytes;
        if (bytes != null) emit(Integer.parseInt(bytes));
      """
    }
  },
  "aggs": {
    "average_response_size":{
      "avg": {
        "field": "http.response.body.bytes"
      }
    }
  },
  "query": {
    "bool": {
      "filter": [
        {
          "range": {
            "@timestamp": {
              "gte": "2099-05-05",
              "lt": "2099-05-08"
            }
          }
        }
      ]
    }
  },
  "fields": [
    "@timestamp",
    "http.response.body.bytes"
  ],
  "_source": false,
  "sort": [
    {
      "@timestamp": "desc"
    }
  ]
}
  • 对应的Java代码
private <T> SearchResponse<T> aggs(
            String index, String script, String start, String end, Class<T> c
) throws Exception {
    return elasticsearchClient.search(_0 -> _0
                    .index(index)
                    .source(_1 -> _1
                            .fetch(false))
                    .fields(_1 -> _1
                            .field("@timestamp"))
                    .fields(_1 -> _1
                            .field("http.response.body.bytes"))
                    .runtimeMappings("http.response.body.bytes",
                            _1 -> _1
                                    .type(RuntimeFieldType.Long)
                                    .script(_2 -> _2
                                            .inline(_3 -> _3
                                                    .source(script))))
                    .query(_1 -> _1
                            .bool(_2 -> _2
                                    .filter(_3 -> _3
                                            .range(_4 -> _4
                                                    .field("@timestamp")
                                                    .gte(JsonData.of(start))
                                                    .lt(JsonData.of(end))))))
                    .aggregations("average_response_size", _1 -> _1
                            .avg(_2 -> _2
                                    .field("http.response.body.bytes")))
                    .sort(_1 -> _1
                            .field(_2 -> _2
                                    .field("@timestamp")
                                    .order(SortOrder.Desc))),
            c);
}
String script = "String bytes=grok('%{COMMONAPACHELOG}')" +
                ".extract(doc[ \"event.original\" ].value)?.bytes;\n" +
                "        if (bytes != null) emit(Integer.parseInt(bytes));";

清除

  • 原语句
DELETE _data_stream/logs-my_app-default
  • 对应的Java代码
private DeleteDataStreamResponse deleteDateStreamIndex(String index)
        throws Exception {
    return elasticsearchClient.indices()
            .deleteDataStream(_0 -> _0
                    .name(index));
}
 类似资料: