最近项目中用到了对es文档的批量更新操作,根据id单个单个进行文档更新时 比较影响性能,故而使用es的script脚本对query查询出来的文档进行更新操作。
{
“script”: {
“source”: “ctx._source[‘要修改的字段名’]=‘要修改为的参数’”
},
“query”: {
“term”: {
“条件字段名”: “条件字段参数”
}
}
}
1、会使用es
2、需要了解script
3、了解painless语法
在kibana上使用如下命令,能将query查询到的文档中 url 字段上的值 赋值给url字段。
POST 索引名/_update_by_query
{
"query": {
"bool": {
"must": [
{
"exists": {
"field": "url"
}
},
],
"must_not": [
{
"exists": {
"field": "newUrl"
}
}
]
}
},
"script":{
"inline" : "ctx._source.newUrl= ctx._source.url;",
"lang" : "painless"
}
}
在kibana上使用如下命令,能将query查询到所有文档中 newUrl 字段上的值设置为 www.baidu.com(即params 中的 url 的值)。
POST 索引名/_update_by_query
{
"query": {
"match_all": {}
},
"script":{
"source" : "ctx._source.newUrl= params.url",
"params": {
"url": "www.baidu.com"
}
}
}
在kibana上使用如下命令,能将query查询到所有存在 linked_persons 字段的文档中,如果存在 linked_persons 字段 并且 linked_persons 字段中不包含需要添加的 数据,则向linked_persons字段中插入该字段。
GET sourcedoc_prod/_update_by_query
{
"query": {
"exists": {
"field": "linked_persons"
}
},
"script": {
"source": " if (ctx._source.containsKey('linked_persons')) {
if(!ctx._source.linked_persons.contains(params.linked_persons)){
ctx._source.linked_persons.add(params.linked_persons);
}
}else {
ctx._source.linked_persons= [params.linked_persons];
}",
"params": {
"linked_persons": "111"
}
}
}
@Test
public void testUpdateByQuery() throws IOException {
Set<String> docIds = new HashSet<>();
docIds.add("xxxxxxxxxxx");
HashMap<String, Object> params = new HashMap<>(2);
params.put("linked_persons","张三");
Script script = new Script(ScriptType.INLINE, "painless",
"if(ctx._source.containsKey('linked_persons')){ if (!ctx._source.linked_persons.contains(params.linked_persons)){ ctx._source.linked_persons.add(params.linked_persons); }} else { ctx._source.linked_persons = [params.linked_persons];}",
params);
// 构造查询条件,查询需要进行修改的文档
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
queryBuilder.must(QueryBuilders.termsQuery("id",docIds));
updateByQuery(queryBuilder , script , Index_a, Index_b);
}
/**
* 通过脚本 批量修改 文档es
* @param queryBuilder 查询条件
* @param script 脚本
* @param indices 需要修改的 索引
* @throws IOException
*/
public void updateByQuery(QueryBuilder queryBuilder, Script script, String... indices) throws IOException {
UpdateByQueryRequest request = new UpdateByQueryRequest(indices);
request.setConflicts("proceed");
request.setQuery(queryBuilder);
request.setScript(script);
restHighLevelClient.updateByQuery(request, RequestOptions.DEFAULT);
}