Document APIs - Using Bulk Processor

优质
小牛编辑
143浏览
2023-12-01

使用 Bulk Processor

BulkProcessor 提供了一个简单的接口,在给定的大小数量上定时批量自动请求

创建BulkProcessor实例

首先创建BulkProcessor实例

  1. import org.elasticsearch.action.bulk.BackoffPolicy;
  2. import org.elasticsearch.action.bulk.BulkProcessor;
  3. import org.elasticsearch.common.unit.ByteSizeUnit;
  4. import org.elasticsearch.common.unit.ByteSizeValue;
  5. import org.elasticsearch.common.unit.TimeValue;
  1. BulkProcessor bulkProcessor = BulkProcessor.builder(
  2. client, //增加elasticsearch客户端
  3. new BulkProcessor.Listener() {
  4. @Override
  5. public void beforeBulk(long executionId,
  6. BulkRequest request) { ... } //调用bulk之前执行 ,例如你可以通过request.numberOfActions()方法知道numberOfActions
  7. @Override
  8. public void afterBulk(long executionId,
  9. BulkRequest request,
  10. BulkResponse response) { ... } //调用bulk之后执行 ,例如你可以通过request.hasFailures()方法知道是否执行失败
  11. @Override
  12. public void afterBulk(long executionId,
  13. BulkRequest request,
  14. Throwable failure) { ... } //调用失败抛 Throwable
  15. })
  16. .setBulkActions(10000) //每次10000请求
  17. .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) //拆成5mb一块
  18. .setFlushInterval(TimeValue.timeValueSeconds(5)) //无论请求数量多少,每5秒钟请求一次。
  19. .setConcurrentRequests(1) //设置并发请求的数量。值为0意味着只允许执行一个请求。值为1意味着允许1并发请求。
  20. .setBackoffPolicy(
  21. BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))//设置自定义重复请求机制,最开始等待100毫秒,之后成倍更加,重试3次,当一次或多次重复请求失败后因为计算资源不够抛出 EsRejectedExecutionException 异常,可以通过BackoffPolicy.noBackoff()方法关闭重试机制
  22. .build();

BulkProcessor 默认设置

  • bulkActions 1000
  • bulkSize 5mb
  • 不设置flushInterval
  • concurrentRequests 为 1 ,异步执行
  • backoffPolicy 重试 8次,等待50毫秒

增加requests

然后增加requestsBulkProcessor

  1. bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));
  2. bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));

关闭 Bulk Processor

当所有文档都处理完成,使用awaitCloseclose 方法关闭BulkProcessor:

  1. bulkProcessor.awaitClose(10, TimeUnit.MINUTES);

  1. bulkProcessor.close();

在测试中使用Bulk Processor

如果你在测试种使用Bulk Processor可以执行同步方法

  1. BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })
  2. .setBulkActions(10000)
  3. .setConcurrentRequests(0)
  4. .build();
  5. // Add your requests
  6. bulkProcessor.add(/* Your requests */);
  7. // Flush any remaining requests
  8. bulkProcessor.flush();
  9. // Or close the bulkProcessor if you don't need it anymore
  10. bulkProcessor.close();
  11. // Refresh your indices
  12. client.admin().indices().prepareRefresh().get();
  13. // Now you can start searching!
  14. client.prepareSearch().get();