当前位置: 首页 > 工具软件 > delete > 使用案例 >

06.delete_by_query操作

翟越
2023-12-01

1. Delete By Query API简介

根据查询API进行删除
最简单的用法是使用_delete_by_query对每个查询匹配的文档执行删除。这是API:

POST twitter/_delete_by_query
{
  "query": { //①
    "match": {
      "message": "some message"
    }
  }
} 

该查询必须以与Search API相同的方式作为query键的值传递。您也可以以与search api相同的方式使用q参数。

它将返回类似如下的一些东西:

{
  "took" : 147,
  "timed_out": false,
  "deleted": 119,
  "batches": 1,
  "version_conflicts": 0,
  "noops": 0,
  "retries": {
    "bulk": 0,
    "search": 0
  },
  "throttled_millis": 0,
  "requests_per_second": -1.0,
  "throttled_until_millis": 0,
  "total": 119,
  "failures" : [ ]
} 

_delete_by_query在开始处理时时获取索引的快照,并使用内部版本控制删除它所查找到的内容。这意味着如果文档在query和处理删除之间发生变化,会报冲突错误。当版本匹配时文档被删除。

** 注意 **

由于内部版本控制不支持值0作为有效的版本号,因此无法使用_delete_by_query删除版本等于零的文档,并且将请求失败。

在_delete_by_query执行期间,依次执行多个搜索请求,以便找到要删除的所有匹配文档。每次发现一批文档时,执行相应的批量请求以删除所有这些文档。如果搜索或批量请求被拒绝,_delete_by_query依赖于默认策略来重试拒绝的请求(最多10次,以指数返回)。达到最大重试次数限制会导致_delete_by_query中止,并在响应失败中返回所有故障。已经执行的删除仍然保持。换句话说,进程没有回滚,只会中止。当第一个故障导致中止时,失败批量请求返回的所有故障都会返回到故障元素中;因此,有可能会有不少失败的实体。
也就是说这不是一个事务操作,不会回滚。

如果您只想统计版本冲突,而不是导致它们中止,那么在URL上设置conflicts=proceed或在请求体重中设置"conflicts": “proceed”。

返回到API格式,您可以将_delete_by_query限制为单一类型。下面示例将只会从Twitter的索引中删除tweet类型的文档:

POST twitter/_delete_by_query?conflicts=proceed
{
  "query": {
    "match_all": {}
  }
} 

也可以一次删除多个索引文件和多个类型,就像搜索API:

POST twitter,blog/_delete_by_query
{
  "query": {
    "match_all": {}
  }
} 

如果您提供routing,则将路由复制到滚动查询,将过程限制为与该路由值匹配的分片:

POST twitter/_delete_by_query?routing=1
{
  "query": {
    "range" : {
        "age" : {
           "gte" : 10
        }
    }
  }
} 

默认情况下_delete_by_query使用滚动批量处理数量为1000。您可以使用URL的scroll_size参数更改批量大小:

POST twitter/_delete_by_query?scroll_size=5000
{
  "query": {
    "term": {
      "user": "kimchy"
    }
  }
} 

2. URL参数

除了标准参数像pretty之外,“Delete By Query API”还支持refresh、wait_for_completion、wait_for_active_shards、timeout以及requests_per_second。

发送refresh将在一旦根据查询删除完成之后, 刷新所有涉及到的分片。这与delete API的refresh参数不同,delete API只会refresh收到delete请求的shard。

如果请求包含wait_for_completion=false,那么Elasticsearch将执行一些预检检查、启动请求、然后返回一个任务,可以与Tasks API一起使用来取消或获取任务的状态。Elasticsearch还将以.tasks/task/${taskId}作为文档创建此任务的记录。这是你可以根据是否合适来保留或删除它。当你完成它时,删除它可以让Elasticsearch回收它使用的空间。

wait_for_active_shards控制在继续请求之前必须有多少个分片必须处于活动状态,详见这里。timeout控制每个写入请求等待不可用分片变成可用的时间。两者都能正确地在Bulk API中工作。

requests_per_second可以设置为任何正数(1.4,6,1000等),来作为“delete-by-query”每秒请求数的节流阀数字,或者将其设置为-1以禁用限制。节流是在批量批次之间等待,以便它可以操纵滚动超时。等待时间是批次完成的时间与request_per_second * requests_in_the_batch的时间之间的差异。由于分批处理没有被分解成多个批量请求,所以会导致Elasticsearch创建许多请求,然后等待一段时间再开始下一组。这是“突发”而不是“平滑”。默认值为-1。在集群本省的任务比较重的情况下建议开启限流,以减少对集群资源的使用。

throttle的计算方式是,每秒处理requests_per_second 个request,假如我们设置requests_per_second=500,写1000个需要0.5s,则等待时间是

target_time = 1000 / 500 per second = 2 seconds
wait_time = target_time - write_time = 2 seconds - .5 seconds = 1.5 seconds

默认没有限流

3. 响应体

JSON响应类似如下:

{
  "took" : 147,
  "timed_out": false,
  "total": 119,
  "deleted": 119,
  "batches": 1,
  "version_conflicts": 0,
  "noops": 0,
  "retries": {
    "bulk": 0,
    "search": 0
  },
  "throttled_millis": 0,
  "requests_per_second": -1.0,
  "throttled_until_millis": 0,
  "failures" : [ ]
}

took: 从整个操作的开始到结束的毫秒数。
timed_out: delete by query的请求处理过程中是否超时
total: 成功处理的doc数量
deleted: 成功删除的文档数。
batches: 通过查询删除的scroll响应数量。
version_conflicts: 根据查询删除时,版本冲突的数量。
noops: 这个在这里没有用,只是为了让delete by query, update by query, and reindex APIs 返回相同结构的 responses
retries: search 操作和bulk操作的重试次数
throttled_millis: 因为requests_per_second而产生的睡眠时间
requests_per_second: 每秒处理的请求数
throttled_until_millis: 这个也是总是0,没有用,为了保持结构一致性
failures: 失败的情况,会终止操作的失败

失败的索引数组。如果这是非空的,那么请求因为这些失败而中止。请参阅 conflicts 来如何防止版本冲突中止操作。

4. 配合Task API使用

您可以使用Task API获取任何正在运行的根据查询删除请求的状态:

GET _tasks?detailed=true&actions=*/delete/byquery 

响应会类似如下:

{
  "nodes" : {
    "r1A2WoRbTwKZ516z6NEs5A" : {
      "name" : "r1A2WoR",
      "transport_address" : "127.0.0.1:9300",
      "host" : "127.0.0.1",
      "ip" : "127.0.0.1:9300",
      "attributes" : {
        "testattr" : "test",
        "portsfile" : "true"
      },
      "tasks" : {
        "r1A2WoRbTwKZ516z6NEs5A:36619" : {
          "node" : "r1A2WoRbTwKZ516z6NEs5A",
          "id" : 36619,
          "type" : "transport",
          "action" : "indices:data/write/delete/byquery",
          "status" : {    //①
            "total" : 6154,
            "updated" : 0,
            "created" : 0,
            "deleted" : 3500,
            "batches" : 36,
            "version_conflicts" : 0,
            "noops" : 0,
            "retries": 0,
            "throttled_millis": 0
          },
          "description" : ""
        }
      }
    }
  }
} 

① 此对象包含实际状态。它就像是响应json,重要的添加total字段。 total是重建索引希望执行的操作总数。您可以通过添加的updated、created和deleted的字段来估计进度。当它们的总和等于total字段时,请求将完成。

使用任务id可以直接查找任务:

GET /_tasks/r1A2WoRbTwKZ516z6NEs5A:36619


这个API的优点是它与wait_for_completion=false集成,以透明地返回已完成任务的状态。如果任务完成并且wait_for_completion=false被设置,那么它将返回results或error字段。此功能的成本是wait_for_completion=false在.tasks索引创建taskId的文档,需要你自己删除该doc。

1. 配合取消任务API使用

所有根据查询删除都能使用Task Cancel API取消:

POST _tasks/task_id:1/_cancel 

可以使用上面的任务API找到task_id。 取消应尽快发生,但可能需要几秒钟。上面的任务状态API将继续列出任务,直到它被唤醒取消自身。

重置节流阀
request_per_second的值可以在通过查询删除时使用_rethrottle API更改:

POST _delete_by_query/r1A2WoRbTwKZ516z6NEs5A:36619/_rethrottle?requests_per_second=-1

可以使用上面的任务API找到task_id。

就像在_delete_by_query API中设置它一样,request_per_second可以是-1来禁用限制,或者任何十进制数字,如1.7或12,以节制到该级别。加速查询的会立即生效,但是在完成当前批处理之后,减慢查询的才会生效。这样可以防止滚动超时。

5. 切片并行

1. 手动切片并行

Delete by query支持滚动切片,您可以相对轻松地手动并行化处理:

POST twitter/_delete_by_query
{
  "slice": {
    "id": 0,
    "max": 2
  },
  "query": {
    "range": {
      "likes": {
        "lt": 10
      }
    }
  }
}

POST twitter/_delete_by_query
{
  "slice": {
    "id": 1,
    "max": 2
  },
  "query": {
    "range": {
      "likes": {
        "lt": 10
      }
    }
  }
} 

上面两个请求的max要保持一致,代表了总共有2个并行的任务,一个id为0,一个id为1。

您可以通过以下方式验证:

GET _refresh
POST twitter/_search?size=0&filter_path=hits.total
{
  "query": {
    "range": {
      "likes": {
        "lt": 10
      }
    }
  }
} 

其结果一个合理的total像这样:

{
  "hits": {
    "total" : {
        "value": 0,
        "relation": "eq"
    }
  }
}

2. 自动切片

你还可以让根据查询删除使用切片的_uid来自动并行的滚动切片。

POST twitter/_delete_by_query?refresh&slices=5
{
  "query": {
    "range": {
      "likes": {
        "lt": 10
      }
    }
  }
} 

上面的会自动分成5个任务来执行

您可以通过以下方式验证:

POST twitter/_search?size=0&filter_path=hits.total
{
  "query": {
    "range": {
      "likes": {
        "lt": 10
      }
    }
  }
} 

其结果一个合理的total像这样:

{
  "hits": {
    "total" : {
        "value": 0,
        "relation": "eq"
    }
  }
}


将切片设置为自动将使Elasticsearch选择要使用的切片数。此设置将每个分片使用一个slice,上限为一定限制。如果有多个源index,它将根据分片数量最少的索引选择slice。

在_delete_by_query使用slice只会自动执行上一节中使用的手动过程,从而创建子请求,这意味着它具有一些怪癖:

  1. 您可以在Task API中看到这些请求。这些子请求是具有slices请求任务的“子”任务。
  2. 获取slices请求任务的状态只包含已完成切片的状态。
  3. 这些子请求可以单独寻址,例如取消和重置节流阀。
  4. slices的重置节流阀请求将按相应的重新计算未完成的子请求。
  5. slices的取消请求将取消每个子请求。
  6. 由于slices的性质,每个子请求将不会获得完全均匀的文档部分。所有文件都将被处理,但有些片可能比其他片大。预期更大的切片可以有更均匀的分布。
  7. 带有slices请求的request_per_second和size的参数相应的分配给每个子请求。结合上述关于分布的不均匀性,您应该得出结论,使用size 参数在slice中有可能删除的并不是完全准确的想要删除的数据量。
  8. 每个子请求都会获得源索引的略有不同的快照,尽管这些都是大致相同的时间。

3. 挑选slice的数量

在这一点上,我们围绕要使用的slices数量提供了一些建议(比如手动并行化时,切片API中的max参数):

不要使用大的数字,500就能造成相当大的CPU抖动。
在源索引中使用完全相同的分片是从查询性能的角度来看效率最高的。
索引性能应在可用资源之间以slices数量线性扩展。
索引或查询性能是否支配该流程取决于许多因素,如正在重建索引的文档和进行reindexing的集群。

 类似资料: