elasticsearch实战

庞瀚
2023-12-01

前言

最近项目里需要对某块功能的查询支持全文检索功能,于是乎使用到了专门的搜索引擎 elasticsearch来实现。通过2周左右的官网资料查询,成功在项目中引入并使用,这里记录部分使用内容。es官网的刷新速度很慢,我是通过把官网的页面下载到本地后,需要时再挨个打开浏览。学习es 主要还是先学习其基础的语法,在kibanna里先熟悉命令行的方式,以及操作的返回数据格式,再去熟悉Java 客户端,再容易不过了。

  • 此次字段类型主要用到了es的 text类型,keyword类型,number类型,data类型;
  • 查询涉及到 模糊查询,分词查询,精确查询。未涉及到 聚合统计相关查询操作;
  • 修改也只是涉及到索引文档的属性值新增,删除等;
  • elasticsearch 版本 7.17.4
  • Java client:elasticsearch-java 版本也是7.17.4,注意7版本的原来的elasticsearch-rest-high-level-client客户端 官方已经弃用。

引入maven 依赖

关于在maven中引入 es Java客户端的地方需要注意下复工程spring boot的版本,里会带有es相关的一些依赖如,要正确引入的话,最好是在 <dependencyManagement> 里预先声明好自己需要的依赖和版本号,然后 在<dependency> 中引用,以免spring boot 顶级父工程的依赖覆盖当前声明的引入的es相关依赖。参考官网 installation

 <dependency>
            <groupId>co.elastic.clients</groupId>
            <artifactId>elasticsearch-java</artifactId>
            <version>${elasticsearch.version}</version>
        </dependency>
        <dependency>
            <groupId>jakarta.json</groupId>
            <artifactId>jakarta.json-api</artifactId>
            <version>2.0.1</version>
        </dependency>

yaml文件

## elastic配置
elastic:
  username: dem01
  password: 369cWLF7y*uabc
  hosts:
    - ip: 192.168.125.100
      port: 9200

propertiesconfiguration 配置类

/**
 * es 配置属性填充对象
 *
 * @author lvzb
 * @date 2022/12/11  23:00
 **/
@Data
@Component
@ConfigurationProperties(prefix = "elastic")
public class ElasticProperties {
    private String username;
    private String password;
    private List<HostPort> hosts;

    @Data
    public static class HostPort {
        private String ip;
        private int port;
    }
}
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.aexpec.dem.backend.config.properties.ElasticProperties;
import lombok.SneakyThrows;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.stream.Collectors;

/**
 * elastic 配置类
 *
 * @author lvzb
 * @date 2022/12/11  23:02
 **/
@Configuration
public class ElasticConfig {

    @SneakyThrows
    @Bean
    public ElasticsearchClient elasticsearchClient(ElasticProperties properties) {
        BasicCredentialsProvider authProvider = new BasicCredentialsProvider();
        authProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(properties.getUsername(), properties.getPassword()));
        RestClient restClient = RestClient
                .builder(properties.getHosts()
                        .stream()
                        .map(h -> new HttpHost(h.getIp(), h.getPort()))
                        .collect(Collectors.toList())
                        .toArray(new HttpHost[properties.getHosts().size()]))
                .setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(authProvider))
                .build();
        RestClientTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
        ElasticsearchClient client = new ElasticsearchClient(transport);
        return client;
    }
}

es 索引结构设计

设计一个索引,该索引的文档结构是 表+服务标签+列字段的组合体,可以通过列字段查询、或服务标签查询、或者表相关的字段查询,捞出具体的表的数据

索引model类

/**
 * elasticsearch的 dem_meta_search 索引模型
 *
 * @author lvzb
 * @date 2022/12/12  15:03
 **/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class EsTableModel implements Serializable {
    private static final long serialVersionUID = 8999939790690200163L;

    public static final String INDEX_NAME = "dem_meta_search";

    public static final String KEY_GUID = "guid";
    public static final String KEY_SUBJECT_DOMAIN_ID = "subjectDomainId";
    public static final String KEY_SUBJECT_DOMAIN = "subjectDomain";
    public static final String KEY_TABLE_NAME = "tableName";
    public static final String KEY_DB_NAME = "databaseName";

    public static final String KEY_SERVICE_TAG_ID = "id";
    public static final String KEY_SERVICE_TAG_NAME = "labelName";

    public static final String KEY_COLUMN_NAME = "columns.name";
    public static final String KEY_COLUMN_GUID = "columns.guid";
    public static final String KEY_COLUMN_TERM_GUID = "columns.termGuid";
    public static final String KEY_COLUMN_ENGLISH_FULL_NAME = "columns.english_full_name";
    public static final String KEY_COLUMN__ENGLISH_ABBR = "columns.english_abbr";
    public static final String KEY_COLUMN_BUSINESS_TERM = "columns.business_term";
    public static final String KEY_COLUMN_MESSAGE_DOMAIN_LABEL = "columns.messagedomain_label";
    public static final String KEY_COLUMN_VARIABLE_PROPERTIES = "columns.variable_properties";
    public static final String KEY_COLUMN_MEANING_USAGE = "columns.meaning_usage";
    public static final String KEY_COLUMN_REMARK = "columns.remark";

    private String guid;
    private String tableName;
    private String tableDistinction;
    private String databaseName;
    private String databaseType;
    private String tableState;
    private String tableSummary;
    private String tableAbbreviation;
    private String dataOwner;
    private String dataOwnerName;
    private List<ServiceTag> serviceTag;
    private String sourceSystem;
    private String sourceFileOrTableName;
    private String subjectDomain;
    private String subjectDomainId;
    private String represent;
    private String timestamp;
    private Long visits;
    private List<Column> columns;

    @Data
    @Builder
    @AllArgsConstructor
    @NoArgsConstructor
    public static class ServiceTag implements Serializable {
        private static final long serialVersionUID = 184739039448219304L;

        private Long id;
        private String labelName;
    }

    @Data
    @Builder
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Column implements Serializable {
        private String guid;
        private String name;

        private String termGuid;
        private String tableGuid;
        /**
         * 英文全称
         */
        private String english_full_name;
        /**
         * 英文简称
         */
        private String english_abbr;
        /**
         * 业务术语
         */
        private String business_term;

        /**
         * 报文域或标签
         */
        private String messagedomain_label;
        /**
         * 变量属性
         */
        private String variable_properties;
        /**
         * 含义及用法
         */
        private String meaning_usage;
        /**
         * 备注
         */
        private String remark;
    }

    /**
     * 文档操作枚举,创建,删除,移除字段,新增字段,修改字段
     */
    public enum Operator {
        CREATE, DELETE, REMOVE_FILED, ADD_FILED, UPDATE_FIELD;
    }

}

索引结构Java客户端创建

/**
 * @author lvzb
 * @date 2022/12/09  10:04
 **/
@Slf4j
public class ElasticsearchTest {

    private static ElasticsearchClient client;
    private static final String indexName = "dem_meta_search";

    @BeforeEach
    void beforeEach() {
        // Create the low-level client
        RestClient restClient = RestClient.builder(
                new HttpHost("localhost", 9200)).build();

        // Create the transport with a Jackson mapper
        ElasticsearchTransport transport = new RestClientTransport(
                restClient, new JacksonJsonpMapper());

        // And create the API client
        client = new ElasticsearchClient(transport);
    }

    @Test
    void createIndexByMapping() throws IOException {
        final String index = indexName;
        BooleanResponse response = client.indices().exists(e -> e.index(index));
        if (!response.value()) {
            CreateIndexResponse createIndexResponse = client.indices().create(i ->
                    i.settings(s -> s
                            .analysis(a -> a
                                    .analyzer("default", d -> d.custom(c -> c.tokenizer("ik_max_word").filter("lowercase")))))
                            .index(index)
                            .mappings(m ->
                                    m.properties("guid", p -> p.keyword(k -> k.index(true)))
                                            .properties("content", p -> p.text(t -> t.index(true)))
                                            .properties("tableName", p -> p.text(t -> t.index(true).analyzer("ik_max_word")))
                                            .properties("tableDistinction", p -> p.text(t -> t.index(true)))
                                            .properties("databaseName", p -> p.text(t -> t.index(true)))
                                            .properties("databaseType", p -> p.text(t -> t.index(true)))
                                            .properties("tableState", p -> p.text(t -> t.index(true)))
                                            .properties("tableSummary", p -> p.text(t -> t.index(true)))
                                            .properties("tableAbbreviation", p -> p.text(t -> t.index(true)))
                                            .properties("dataOwner", p -> p.text(t -> t.index(true)))
                                            .properties("dataOwnerName", p -> p.text(t -> t.index(true)))
                                            .properties("sourceSystem", p -> p.text(t -> t.index(true)))
                                            .properties("subjectDomainId", p -> p.keyword(k -> k.index(true)))
                                            .properties("subjectDomain", p -> p.text(t -> t.index(true)))
                                            .properties("timestamp", p -> p.date(d -> d.index(true).format("yyyy-MM-dd HH:mm:ss")))
                                            .properties("visits", p -> p.unsignedLong(l -> l.index(true)))

                                            .properties("serviceTags",
                                                    Property.of(p -> p.object(v ->
                                                            v.properties("id", pk -> pk.keyword(k -> k.index(true)))
                                                                    .properties("labelName", pt -> pt.text(tf -> tf.fields("keyword", f -> f.keyword(k -> k.index(true))))))))
                                            .properties("columns",
                                                    p -> p.object(o -> o.properties("guid", op -> op.keyword(k -> k.index(true)))
                                                            .properties("name", pp -> pp.text(t -> t.index(true)))
                                                            .properties("termGuid", pp -> pp.keyword(k -> k.index(true)))
                                                            .properties("tableGuid", pp -> pp.keyword(k -> k.index(true)))
                                                            .properties("english_full_name", pp -> pp.text(t -> t.index(true)))
                                                            .properties("english_abbr", pp -> pp.text(t -> t.index(true)))
                                                            .properties("business_term", pp -> pp.text(t -> t.index(true)))
                                                            .properties("messagedomain_label", pp -> pp.text(t -> t.index(true)))
                                                            .properties("variable_properties", pp -> pp.text(t -> t.index(true)))
                                                            .properties("meaning_usage", pp -> pp.text(t -> t.index(true)))
                                                            .properties("remark", pp -> pp.text(t -> t.index(true)))

                                                    ))));
            log.warn("see response:: {}", createIndexResponse);
        }
    }

    @Test
    void getMappings() throws IOException {
        GetMappingResponse mapping = client.indices().getMapping();
        log.info("{}", mapping);
    }
}    

kibana 的结构如下

{
  "dem_meta_search" : {
    "aliases" : { },
    "mappings" : {
      "properties" : {
        "columns" : {
          "properties" : {
            "business_term" : {
              "type" : "text"
            },
            "email" : {
              "type" : "text",
              "fields" : {
                "keyword" : {
                  "type" : "keyword",
                  "ignore_above" : 256
                }
              }
            },
            "english_abbr" : {
              "type" : "text"
            },
            "english_full_name" : {
              "type" : "text"
            },
            "guid" : {
              "type" : "keyword"
            },
            "meaning_usage" : {
              "type" : "text"
            },
            "messagedomain_label" : {
              "type" : "text"
            },
            "name" : {
              "type" : "text"
            },
            "remark" : {
              "type" : "text"
            },
            "tableGuid" : {
              "type" : "keyword"
            },
            "termGuid" : {
              "type" : "keyword"
            },
            "variable_properties" : {
              "type" : "text"
            }
          }
        },
        "content" : {
          "type" : "text"
        },
        "dataOwner" : {
          "type" : "text"
        },
        "dataOwnerName" : {
          "type" : "text"
        },
        "databaseName" : {
          "type" : "text"
        },
        "databaseType" : {
          "type" : "text"
        },
        "guid" : {
          "type" : "keyword"
        },
        "serviceTags" : {
          "properties" : {
            "id" : {
              "type" : "keyword"
            },
            "labelName" : {
              "type" : "text",
              "fields" : {
                "keyword" : {
                  "type" : "keyword"
                }
              }
            }
          }
        },
        "sourceSystem" : {
          "type" : "text"
        },
        "subjectDomain" : {
          "type" : "text"
        },
        "subjectDomainId" : {
          "type" : "keyword"
        },
        "tableAbbreviation" : {
          "type" : "text"
        },
        "tableDistinction" : {
          "type" : "text"
        },
        "tableName" : {
          "type" : "text",
          "analyzer" : "ik_max_word"
        },
        "tableState" : {
          "type" : "text"
        },
        "tableSummary" : {
          "type" : "text"
        },
        "timestamp" : {
          "type" : "date",
          "format" : "yyyy-MM-dd HH:mm:ss"
        },
        "visits" : {
          "type" : "unsigned_long"
        }
      }
    },
    "settings" : {
      "index" : {
        "routing" : {
          "allocation" : {
            "include" : {
              "_tier_preference" : "data_content"
            }
          }
        },
        "number_of_shards" : "1",
        "provided_name" : "dem_meta_search",
        "creation_date" : "1671085868187",
        "analysis" : {
          "analyzer" : {
            "default" : {
              "filter" : [
                "lowercase"
              ],
              "type" : "custom",
              "tokenizer" : "ik_max_word"
            }
          }
        },
        "number_of_replicas" : "1",
        "uuid" : "P431r3g_TLi4QOcWwhB47w",
        "version" : {
          "created" : "7170499"
        }
      }
    }
  }
}

文档查询(模糊查询,多条件查询)

模糊查询主要有2种方式,一种的term的 WildcardQuery,还有一种是fuzzy,fuzzy功能强大一些,有一定的容错性,WildcardQuery 类型SQL种的like。参数 caseInsensitive= true 代表大小写不敏感。

/**
 * @author lvzb
 * @date 2022/12/09  10:04
 **/
@Slf4j
public class ElasticsearchTest {

    private static ElasticsearchClient client;
    private static final String indexName = "dem_meta_search";

    @BeforeEach
    void beforeEach() {
        // Create the low-level client
        RestClient restClient = RestClient.builder(
                new HttpHost("localhost", 9200)).build();

        // Create the transport with a Jackson mapper
        ElasticsearchTransport transport = new RestClientTransport(
                restClient, new JacksonJsonpMapper());

        // And create the API client
        client = new ElasticsearchClient(transport);
    }

    @Test
    void fullTextQuery() throws IOException {
        String queryVal = "信息";
        Query byTableName = WildcardQuery.of(wi -> wi.field(KEY_TABLE_NAME).value("*" + queryVal + "*").caseInsensitive(true).boost(Float.valueOf("5")))._toQuery();
        Query byDbName = WildcardQuery.of(wi -> wi.field(KEY_DB_NAME).value("*" + queryVal + "*").caseInsensitive(true).boost(Float.valueOf("5")))._toQuery();
        Query byColumnName = WildcardQuery.of(wi -> wi.field(KEY_COLUMN_NAME).value("*" + queryVal + "*").caseInsensitive(true).boost(Float.valueOf("5")))._toQuery();
        Query byServiceTag = TermQuery.of(te -> te.field(KEY_SERVICE_TAG_NAME).value(queryVal).caseInsensitive(true).boost(Float.valueOf("10")))._toQuery();
        Query byMulti = MultiMatchQuery.of(mu -> mu
                .query(queryVal)
                .analyzer("ik_max_word")
                .fields("tableAbbreviation", "tableSummary", "sourceSystem",
                        "columns.business_term","columns.messagedomain_label", "columns.variable_properties",
                        "columns.meaning_usage","columns.remark,columns.english_full_name,columns.english_abbr")
                .boost(Float.valueOf("1.5")))._toQuery();

        SearchResponse<EsTableModel> response = client.search(se -> se
                .index(indexName)
                .query(qu -> qu
                        .bool(bo -> bo
                                .should(List.of(byTableName, byDbName, byColumnName, byServiceTag, byMulti)))), EsTableModel.class);

        System.out.println(response.hits().hits());
        for (Hit<EsTableModel> hit : response.hits().hits()) {
            System.out.println(hit.source());
        }
    }

}    

索引文档修改

主要是使用es 的查询修改api, 使用脚本的形式。这个脚本实际上是Groovy 脚本. 查询修改主要注意字段的key如果不存在的情况,是使用前需要考虑到。

/**
 * @author lvzb
 * @date 2022/12/14  10:17
 **/
@Slf4j
public class DemMetaSearchServiceTest {
    ///    @Autowired
//    private ElasticsearchClient client;
    private ElasticsearchClient client;

    private static final String indexName = "dem_meta_search";


    @BeforeEach
    void beforeEach() {
        // Create the low-level client
        RestClient restClient = RestClient.builder(
                new HttpHost("localhost", 9200)).build();

        // Create the transport with a Jackson mapper
        ElasticsearchTransport transport = new RestClientTransport(
                restClient, new JacksonJsonpMapper());

        // And create the API client
        client = new ElasticsearchClient(transport);
    }
    @SneakyThrows
    @Test
    public void updateDocForSave() {
        EsTableModel model = builder().guid("fb85596e-4d7d-4ffa-97e3-b8a35e308520")
                .tableAbbreviation("表abbr")
                .tableSummary("表summary")
                .sourceSystem("source System 11")
//                .subjectDomainId("5")
//                .subjectDomain("主题域")
                .build();

        String script =
                "if(null != params.model.get('tableAbbreviation')){ctx._source.tableAbbreviation = params.model.tableAbbreviation}else{ctx._source.remove('tableAbbreviation')}" +
                        "if(null != params.model.get('tableSummary')){ctx._source.tableSummary = params.model.tableSummary}else{ctx._source.remove('tableSummary')}" +
                        "if(null != params.model.get('sourceSystem')){ctx._source.sourceSystem = params.model.sourceSystem}else{ctx._source.remove('sourceSystem')}" +
                        "if(null != params.model.get('subjectDomainId')){ctx._source.subjectDomainId = params.model.subjectDomainId}else{ctx._source.remove('subjectDomainId')}" +
                        "if(null != params.model.get('subjectDomain')){ctx._source.subjectDomain = params.model.subjectDomain}else{ctx._source.remove('subjectDomain')}";

        UpdateByQueryResponse response = client.updateByQuery(u -> u.
                index(INDEX_NAME)
                .script(sc -> sc
                        .inline(in -> in
                                .source(script)
                                .lang("painless")
                                .params("model", JsonData.of(model))))
                .query(q -> q
                        .term(t -> t
                                .field(KEY_GUID)
                                .value(model.getGuid()))));
    }

    @SneakyThrows
    @Test
    public void updateDocForCallBack() {
        EsTableModel model = builder().guid("fb85596e-4d7d-4ffa-97e3-b8a35e308520")
                .tableAbbreviation("表1abbr1")
//                .tableSummary("表1summary1")
                .sourceSystem("11source System 11")
                .dataOwner("jjin8")
                .dataOwnerName("Jay jin")
                .tableState("open")
//                .serviceTags(List.of(ServiceTag.builder().id(5L).labelName("UDS_ACCESS").build(), ServiceTag.builder().id(6L).labelName("UDS_SQL").build()))
                .build();

        String script =
                "if(null != params.model.get('tableAbbreviation')){ctx._source.tableAbbreviation = params.model.tableAbbreviation}else{ctx._source.remove('tableAbbreviation')}" +
                        "if(null != params.model.get('tableSummary')){ctx._source.tableSummary = params.model.tableSummary}else{ctx._source.remove('tableSummary')}" +
                        "if(null != params.model.get('sourceSystem')){ctx._source.sourceSystem = params.model.sourceSystem}else{ctx._source.remove('sourceSystem')}" +
                        "if(null != params.model.get('dataOwner')){ctx._source.dataOwner = params.model.dataOwner}else{ctx._source.remove('dataOwner')}" +
                        "if(null != params.model.get('dataOwnerName')){ctx._source.dataOwnerName = params.model.dataOwnerName}else{ctx._source.remove('dataOwnerName')}" +
                        "if(null != params.model.get('tableState')){ctx._source.tableState = params.model.tableState}" +
                        "if(null != params.model.get('serviceTags')){ctx._source.serviceTags = params.model.serviceTags}else{ctx._source.remove('serviceTags')}" +
                        "if(null != params.model.get('columns') && params.model.columns.size()>0){ctx._source.columns = params.model.columns;}";

        UpdateByQueryResponse response = client.updateByQuery(u -> u.
                index(INDEX_NAME)
                .script(sc -> sc
                        .inline(in -> in
                                .source(script)
                                .lang("painless")
                                .params("model", JsonData.of(model))))
                .query(q -> q
                        .term(t -> t
                                .field(KEY_GUID)
                                .value(model.getGuid()))));
    }

    @SneakyThrows
    @Test
    void visitPlus() {
        EsTableModel model = builder().guid("25f9b835-9e60-4def-a041-68345dfa0f77").build();
        UpdateResponse<EsTableModel> response = client.update(u -> u.index(EsTableModel.INDEX_NAME)
                .id(model.getGuid())
                .script(s -> s
                        .inline(i -> i
                                .source("if(null==ctx._source.visits) {ctx._source.visits = 0 + params.count} else{ctx._source.visits += params.count}")
                                .lang("painless")
                                .params("count", JsonData.of(1)))), EsTableModel.class);
        log.info("{}", response);
    }

    @SneakyThrows
    @Test
    void multiUpdateTerm() {
        BusinessMetaModel model = BusinessMetaModel.builder()
                .termGuid("28e2dc44-43a1-4c92-a0d9-0337f09a2dd1")
                .english_full_name("1111update111")
                .build();

        Operator operator = Operator.REMOVE_FILED;
        if (Operator.REMOVE_FILED.equals(operator)) {
            String shell = "if(ctx._source.columns.length > 0) {" +
                                "for(int i=0; i<ctx._source.columns.length; i++) {" +
                                    "if(null != ctx._source.columns[i].get('termGuid') && ctx._source.columns[i].get('termGuid').equals(params.model.termGuid)) {" +
                                        "ctx._source.columns[i].remove('termGuid');" +
                                        "ctx._source.columns[i].remove('english_full_name');" +
                                        "ctx._source.columns[i].remove('english_abbr');" +
                                        "ctx._source.columns[i].remove('business_term');" +
                                        "ctx._source.columns[i].remove('messagedomain_label');" +
                                        "ctx._source.columns[i].remove('variable_properties');" +
                                        "ctx._source.columns[i].remove('meaning_usage');" +
                                        "ctx._source.columns[i].remove('remark');" +
                                    "}" +
                                "}" +
                             "}";
            UpdateByQueryResponse response = client.updateByQuery(u -> u.
                    index(INDEX_NAME)
                    .script(sc -> sc
                            .inline(in -> in
                                    .source(shell)
                                    .lang("painless")
                                    .params("model", JsonData.of(model))))
                    .query(q -> q
                            .term(t -> t
                                    .field(KEY_COLUMN_TERM_GUID)
                                    .value(model.getTermGuid()))));
            log.warn(">>>>> UpdateByQueryResponse:: {}", response);
        } else if (Operator.UPDATE_FIELD.equals(operator)) {
            String shell = "if(ctx._source.columns.length > 0) {" +
                                "for(int i=0; i<ctx._source.columns.length; i++) {" +
                                    "if(null != ctx._source.columns[i].get('termGuid') && ctx._source.columns[i].get('termGuid').equals(params.model.termGuid)) {" +
                                        "if(null != params.model.get('english_full_name')){ctx._source.columns[i].english_full_name = params.model.get('english_full_name')}else{ctx._source.columns[i].remove('english_full_name')}" +
                                        "if(null != params.model.get('english_abbr')){ctx._source.columns[i].english_abbr = params.model.get('english_abbr')}else{ctx._source.columns[i].remove('english_abbr')}" +
                                        "if(null != params.model.get('business_term')){ctx._source.columns[i].business_term = params.model.get('business_term')}else{ctx._source.columns[i].remove('business_term')}" +
                                        "if(null != params.model.get('messagedomain_label')){ctx._source.columns[i].messagedomain_label = params.model.get('messagedomain_label')}else{ctx._source.columns[i].remove('messagedomain_label')}" +
                                        "if(null != params.model.get('variable_properties')){ctx._source.columns[i].variable_properties = params.model.get('variable_properties')}else{ctx._source.columns[i].remove('variable_properties')}" +
                                        "if(null != params.model.get('meaning_usage')){ctx._source.columns[i].meaning_usage = params.model.get('meaning_usage')}else{ctx._source.columns[i].remove('meaning_usage')}" +
                                        "if(null != params.model.get('remark')){ctx._source.columns[i].remark = params.model.get('remark')}else{ctx._source.columns[i].remove('remark')}" +
                                   "}" +
                                "}" +
                            "}";
            Map<String, JsonData> params;
            UpdateByQueryResponse response = client.updateByQuery(u -> u.
                    index(INDEX_NAME)
                    .script(sc -> sc
                            .inline(in -> in
                                    .source(shell)
                                    .lang("painless")
                                    .params("model", JsonData.of(model))))
                    .query(q -> q
                            .term(t -> t
                                    .field(KEY_COLUMN_TERM_GUID)
                                    .value(model.getTermGuid()))));
            log.warn(">>>>> UpdateByQueryResponse:: {}", response);
        }
    }

    @SneakyThrows
    @Test
    void multiUpdateSubjectDomain() {
        long id = 5;
        String name = "主题研讨论会";
        Operator operator = Operator.UPDATE_FIELD;
        log.warn(">>>>> es 開始批量操作主题数据 ...");
        // 数字和 字符串的比较 要转成一致再比较
        Map<String, ? extends Serializable> model = Map.of("subjectDomainId", String.valueOf(id), "subjectDomain", name);
        if (Operator.REMOVE_FILED.equals(operator)) {
            String script = "if(null !=ctx._source.get('subjectDomainId') && ctx._source.subjectDomainId == params.model.subjectDomainId) {" +
                    "ctx._source.remove('subjectDomainId');" +
                    "ctx._source.remove('subjectDomain');" +
                    "}";
            UpdateByQueryResponse response = client.updateByQuery(u -> u.
                    index(INDEX_NAME)
                    .script(sc -> sc
                            .inline(in -> in
                                    .source(script)
                                    .lang("painless")
                                    .params("model", JsonData.of(model))))
                    .query(q -> q
                            .term(t -> t
                                    .field(KEY_SUBJECT_DOMAIN_ID)
                                    .value(String.valueOf(id)))));
        } else if (Operator.UPDATE_FIELD.equals(operator)) {
            String script = "if(null !=ctx._source.get('subjectDomainId') && ctx._source.subjectDomainId == params.model.subjectDomainId) {" +
                    "ctx._source.subjectDomain = params.model.subjectDomain;" +
                    "}";
            UpdateByQueryResponse response = client.updateByQuery(u -> u.
                    index(INDEX_NAME)
                    .script(sc -> sc
                            .inline(in -> in
                                    .source(script)
                                    .lang("painless")
                                    .params("model", JsonData.of(model))))
                    .query(q -> q
                            .term(t -> t
                                    .field(KEY_SUBJECT_DOMAIN_ID)
                                    .value(String.valueOf(id)))));
        }
        log.warn(">>>>> es 结束批量操作主题域数据 ...");
    }
}

kibana 命令

涉及分词分析、演示查询(模糊、fuzzy、前缀匹配、分词查询等);以及查询修改的语句

############################################  演示    ########################################################
## 高频词汇记录
## 库.表 搜索
## 表签的搜索权重 设置高权重
## 使用wildcard
## 分词高亮使用 es 分词的结果进行高亮
## 中文搜索启用停用词
GET dem_meta_search
GET dem_meta_search/_mapping

DELETE dem_meta_search

## 查询所有 ods_arc, default
GET dem_meta_search/_search
{
  "query": {
    "match_all": {}
  },
  "size": 1000
}

GET dem_meta_search/_search
{
  "query": {
    "bool": {
      "must_not": [
        {"term": {"databaseName":"ods_arc"}},
        {"term": {"databaseName":"ods_arc"}}
      ]
    }
  },
  "_source": ["databaseName"], 
  "size": 1000
}

## 单match个全文检索分词查询
GET dem_meta_search/_search
{
  "query": {
    "match": {
      "content": "测试卡BIN信息"
    }
  },
  "size": 50
}
##  term 不分词查询
GET dem_meta_search/_search
{
  "query": {
    "term": {
      "tableName": {
        "value": "merchant",
        "case_insensitive": true
      }
    }
  }
}
## 模糊查询 大小写不明感
GET dem_meta_search/_search
{
  "query": {
    "wildcard": {
      "tableName": {
        "value": "*merchant*",
        "case_insensitive": true
      }
    }
    },
    "_source": ["tableName"], 
    "size": 50
  }
## 前缀匹配 大小写不明感
GET dem_meta_search/_search
{
  "query": {
    "prefix": {
      "columns.name":{
        "value": "billdate",
        "case_insensitive": true
        
      }
    }
  },
  "fields": [
    "guid","tableName","databaseName","columns.name"
  ],
  "_source": false,
  "size": 50
}
## fuzzy查询
GET dem_meta_search/_search
{
  "query": {
    "fuzzy": {
      "tableName": {
        "fuzziness": 3,
        "value": "mchant",
        "transpositions":true
      }
    }
  },
      "_source": ["tableName"],
      "size": 1000

}

## 分词测试
POST dem_meta_search/_analyze
{
  "analyzer": "default",
  "text": "测试表简述  ods_arc  ECSxx pendingAdmission systemTable arc_ecs_bin hive_db  测试表简称 1"
}
POST dem_meta_search/_analyze
{
  "analyzer": "default",
  "text": "arc_ext_55haitao_merchantinfo"
}
GET _analyze
{ 
  "analyzer": "standard",
  "text": ["测试卡BIN信息"]
}
GET _analyze
{ 
  "analyzer": "standard",
  "text": ["ods_arc"]
}

GET _analyze
{
  "analyzer": "standard",
  "text": ["arc_ext_55haitao_merchantinfo"]
}

## 多条件查询
GET dem_meta_search/_search
{
  "query": {
    "bool": {
      "should": [
        {"wildcard": {
          "tableName": {
            "value": "*UDS_ACCESS*",
            "case_insensitive": true,
            "boost": 5
          }
        }},
        {
          "wildcard": {
            "databaseName": {
              "value": "*UDS_ACCESS*",
              "case_insensitive": true,
              "boost": 5
            }
          }
        },
         {"wildcard": {
          "columns.name": {
            "value": "*UDS_ACCESS*",
            "case_insensitive": true,
            "boost": 5
          }
        }},
        {"term": {
          "serviceTags.labelName":{
            "value": "uds_ACCESS",
            "case_insensitive": true,
            "boost": 9
          }
        }},
         {"multi_match": {
           "query": "UDS_ACCESS",
           "analyzer": "ik_max_word", 
					 "fields": ["tableAbbreviation", "tableSummary", "sourceSystem",
					           "columns.business_term","columns.messagedomain_label",
					           "columns.variable_properties","columns.meaning_usage","columns.remark,columns.english_full_name,columns.english_abbr"],
					           "boost": 1.5
        }
        }
      ]
    }
  },
  "size": 50
}

## 数据更新
POST dem_meta_search/_doc/bb081997-de99-45bb-bada-9f99700fba0a/_update
{
  "doc":{
    "tableSummary":"测试卡BIN信息"
  }
}

GET dem_meta_search/_doc/bb081997-de99-45bb-bada-9f99700fba0a

#################### 分词 ################################

GET _analyze
{
  "analyzer": "ik_max_word",
  "text": ["Pay-Test9F27 Code, CARD Holder Data - Name's"]
}
GET _analyze
{ 
  "analyzer": "ik_max_word",
  "text": ["9f27"]
}

GET _analyze
{ 
  "analyzer": "ik_max_word",
  "text": ["测试卡BIN信息"]
}

GET _analyze
{ 
  "analyzer": "ik_max_word",
  "text": ["卡BIN"]
}

GET _analyze
{ 
  "analyzer": "standard",
  "text": ["域126 W8用法(05)-手续费支付方式  "]
}

GET _analyze
{ 
  "analyzer": "standard",
  "text": ["Pay-Test9F27 Code, CARD Holder Data - Name's"]
}
GET _analyze
{ 
  "analyzer": "standard",
  "text": ["9f27"]
}

GET _analyze
{ 
  "analyzer": "standard",
  "text": ["测试卡BIN信息"]
}
GET _analyze
{ 
  "analyzer": "standard",
  "text": ["卡BIN"]
}
GET _analyze
{ 
  "analyzer": "standard",
  "text": ["耗子尾汁"]
}
GET _analyze
{ 
  "analyzer": "ik_max_word",
  "text": ["耗子尾汁"]
}
GET _analyze
{ 
  "analyzer": "ik_smart",
  "text": ["耗子尾汁"]
}

PUT my-index-0000010
{
  "settings": {
    "analysis": {
      "analyzer": {
        "my_custom_analyzer": {
          "type": "custom", 
          "tokenizer": "ik_max_word"
        }
      }
    }
  }
}

POST my-index-0000010/_analyze
{
  "analyzer": "my_custom_analyzer",
  "ods_arc": "测试卡BIN信息"
}

PUT my-index-0000010
{
  "settings": {
    "analysis": {
      "analyzer": {
        "my_custom_analyzer": {
          "type": "custom", 
          "tokenizer": "ik_max_word",
          "char_filter": [
            "html_strip"
          ],
          "filter": [
            "lowercase",
            "asciifolding"
          ]
        }
      }
    }
  }
}
GET _analyze
{ 
  "analyzer": "standard",
  "text": ["arc_mic_tyc_data"]
}
GET dem_meta_search
GET _search
{
  "query": {
    "match_all": {}
  }
}

######################## 字段移除测试 ###############################

GET dem_meta_search/_search
{
  "query": {
    "match_all": {}
  }
}

GET dem_meta_search/_search
{
  "query": {
    "bool": {
      "must_not": [
        {"term": {
          "columns.termGuid": {
            "value": ""
          }
        }}
      ]
    }
  }
}

POST dem_meta_search/_update_by_query
{
  "script": {
    "source": "if(ctx._source.columns.length > 0) {for(int i=0; i<ctx._source.columns.length; i++) {if(null != ctx._source.columns[i].get('termGuid') && ctx._source.columns[i].get('termGuid').equals(params.model.termGuid)) {if(null != params.model.get('english_full_name')){ctx._source.columns[i].english_full_name = params.model.get('english_full_name')}else{ctx._source.columns[i].remove('english_full_name')}if(null != params.model.get('english_abbr')){ctx._source.columns[i].english_abbr = params.model.get('english_abbr')}else{ctx._source.columns[i].remove('english_abbr')}if(null != params.model.get('business_term')){ctx._source.columns[i].business_term = params.model.get('business_term')}else{ctx._source.columns[i].remove('business_term')}if(null != params.model.get('messagedomain_label')){ctx._source.columns[i].messagedomain_label = params.model.get('messagedomain_label')}else{ctx._source.columns[i].remove('messagedomain_label')}if(null != params.model.get('variable_properties')){ctx._source.columns[i].variable_properties = params.model.get('variable_properties')}else{ctx._source.columns[i].remove('variable_properties')}if(null != params.model.get('meaning_usage')){ctx._source.columns[i].meaning_usage = params.model.get('meaning_usage')}else{ctx._source.columns[i].remove('meaning_usage')}if(null != params.model.get('remark')){ctx._source.columns[i].remark = params.model.get('remark')}else{ctx._source.columns[i].remove('remark')}}}}",
    "lang": "painless",
    "params": {
      "model":{
        "termGuid":"28e2dc44-43a1-4c92-a0d9-0337f09a2dd1",
        "english_full_name":"1111update111"
      }
    }
  }
  ,
  "query": {
    "term": {
      "columns.termGuid": {
        "value": "28e2dc44-43a1-4c92-a0d9-0337f09a2dd1"
      }
    }
  }
}

POST dem_meta_search/_update_by_query
{
  "script": {
    "source": "if(ctx._source.columns.length > 0) {for(int i=0; i<ctx._source.columns.length; i++) {if(null != ctx._source.columns[i].get('termGuid') && ctx._source.columns[i].get('termGuid').equals(params.model.termGuid)) {ctx._source.columns[i].remove('termGuid')ctx._source.columns[i].remove('english_full_name')ctx._source.columns[i].remove('english_abbr')ctx._source.columns[i].remove('business_term')ctx._source.columns[i].remove('messagedomain_label')ctx._source.columns[i].remove('variable_properties')ctx._source.columns[i].remove('meaning_usage')ctx._source.columns[i].remove('remark')}}}",
    "lang": "painless",
    "params": {
      "model":{
        "termGuid":"28e2dc44-43a1-4c92-a0d9-0337f09a2dd1",
        "english_full_name":"1111update111"
      }
    }
  }
  ,
  "query": {
    "term": {
      "columns.termGuid": {
        "value": "28e2dc44-43a1-4c92-a0d9-0337f09a2dd1"
      }
    }
  }
}


GET dem_meta_search/_search
{
  "query": {
    "bool": {
      "must_not": [
        {"term": {
          "subjectDomain": {
            "value": ""
          }
        }}
      ]
    }
  }
}
## 含有业务元数据的对象 table guid fb85596e-4d7d-4ffa-97e3-b8a35e308520
GET dem_meta_search/_doc/fb85596e-4d7d-4ffa-97e3-b8a35e308520

GET dem_meta_search/_mapping
GET dem_meta_search/_doc/d2b1b74e-e080-4a92-b1da-60865d6cf4fc

POST dem_meta_search/_update/d2b1b74e-e080-4a92-b1da-60865d6cf4fc
{
  "script": "ctx._source.columns[0].remove('tableGuid')"
}

## script 循环脚本遍历
POST dem_meta_search/_update/d2b1b74e-e080-4a92-b1da-60865d6cf4fc
{
  "script": {
    "source": "if(ctx._source.columns.length>0){for(int i=0;i<ctx._source.columns.length;i++){if(params.columnName.equals(ctx._source.columns[i].get('name'))){ctx._source.columns[i].tableGuid=params.tableGuid;ctx._source.columns[i].remove('email');ctx._source.subjectDomainId = params.subjectDomainId ;ctx._source.subjectDomain =  params.subjectDomain}}}",
    "lang": "painless",
    "params": {
      "columnName":"billdate",
      "tableGuid":"123456700",
      "subjectDomain":"主题域",
      "subjectDomainId":"5"
    }
  }
}
## groving 语法,可以直接用这种判空的方式
POST dem_meta_search/_update/d2b1b74e-e080-4a92-b1da-60865d6cf4fc
{
  "script": {
    "source": """
    if(null != ctx._source.columns[0].get('name')){
       ctx._source.columns[0].email = '163@.com'
    }
    """
  }
}

## 查询再更新 update by query
POST dem_meta_search/_update_by_query
{
 "script": {
    "source": "if(ctx._source.columns.length>0){for(int i=0;i<ctx._source.columns.length;i++){if(params.columnName.equals(ctx._source.columns[i].get('name'))){ctx._source.columns[i].tableGuid=params.tableGuid;ctx._source.columns[i].remove('email')}}}",
    "lang": "painless",
    "params": {
      "columnName":"billdate",
      "tableGuid":"123456700"
    }
  },
  "query": {
    "term": {
      "guid": {
        "value": "d2b1b74e-e080-4a92-b1da-60865d6cf4fc"
      }
    }
  }
}

参考资料

查询的重点查看Query DSL; Index Modules 是索引相关内容;索引的Mapping 结构和字段类型 查阅 Mapping,Rest API 中包含所有的API操作,包括更新删除新增索引、文档等操作;Text analysis 是和分词相关的;Search your data 是查询后的结构进行 排序、字段过滤、选取等内容

 类似资料: