当前位置: 首页 > 知识库问答 >
问题:

ElasticSearch使用Java RestHighLevelClient使用BulkRequest API索引100K文档

黄英韶
2023-03-14

我正在使用scroll API从索引documents_qa读取100k和文件路径。实际文件将在我的本地d:\drive中提供。通过使用文件路径,am读取实际文件,并将另一个索引document\u attachment\u qa中的base64内容转换为base64和am reindex。

我当前的实现是,我正在读取文件路径,将文件转换为base64,并将文档与文件内容一起逐个索引。因此,它需要更多的时间,例如:-索引4000个文档需要6个多小时,而且由于IO异常,连接正在终止。

所以现在我想使用BulkRequest API为文档编制索引,但我使用的是RestHighLevelClient,不确定如何使用BulkRequestAPI和RestHighLevelClient

请找到我当前的实现,它正在一个接一个地索引文档。

jsonMap = new HashMap<String, Object>();
            jsonMap.put("id", doc.getId());
            jsonMap.put("app_language", doc.getApp_language());
            jsonMap.put("fileContent", result);

            String id=Long.toString(doc.getId());

IndexRequest request = new IndexRequest(ATTACHMENT, "doc", id ) // ATTACHMENT is the index name
                    .source(jsonMap) // Its my single document.
                    .setPipeline(ATTACHMENT);

IndexResponse response = SearchEngineClient.getInstance3().index(request); // increased timeout 

我找到了下面的留档BulkRequest.

https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-bulk.html

但是我不确定如何实现Bulkask estBuilder bulkRequest=client.prepareBulk();client.prepareBulk()方法时和使用RestHighLevelClient

更新1

我试图一次索引所有100K文档。所以我创建了一个JSONArray,并将所有JSONObject一个接一个地放入数组中。最后,我尝试构建BulkRequest,并将我的所有文档(JSONArray)作为源添加到BulkRequest中,并尝试对它们进行索引。

这里我不确定,如何转换我的JSONArray字符串列表。

private final static String ATTACHMENT = "document_attachment_qa";
private final static String TYPE = "doc";
JSONArray reqJSONArray=new JSONArray();

while (searchHits != null && searchHits.length > 0) { 
...
...
    jsonMap = new HashMap<String, Object>();
    jsonMap.put("id", doc.getId());
    jsonMap.put("app_language", doc.getApp_language());
    jsonMap.put("fileContent", result);

    reqJSONArray.put(jsonMap)
}

String actionMetaData = String.format("{ \"index\" : { \"_index\" : \"%s\", \"_type\" : \"%s\" } }%n", ATTACHMENT, TYPE);
List<String> bulkData =   // not sure how to convert a list of my documents in JSON strings    
StringBuilder bulkRequestBody = new StringBuilder();
for (String bulkItem : bulkData) {
    bulkRequestBody.append(actionMetaData);
    bulkRequestBody.append(bulkItem);
    bulkRequestBody.append("\n");
}

HttpEntity entity = new NStringEntity(bulkRequestBody.toString(), ContentType.APPLICATION_JSON);
try {
    Response response = SearchEngineClient.getRestClientInstance().performRequest("POST", "/ATTACHMENT/TYPE/_bulk", Collections.emptyMap(), entity);
    return response.getStatusLine().getStatusCode() == HttpStatus.SC_OK;
} catch (Exception e) {
    // do something
}

共有2个答案

邓威
2023-03-14

除了@chengpohi答案。我想补充以下几点:

BulkRequest可用于使用单个请求执行多个索引、更新和/或删除操作。

它要求至少向批量请求添加一个操作:

BulkRequest request = new BulkRequest(); 
request.add(new IndexRequest("posts", "doc", "1")  
        .source(XContentType.JSON,"field", "foo"));
request.add(new IndexRequest("posts", "doc", "2")  
        .source(XContentType.JSON,"field", "bar"));
request.add(new IndexRequest("posts", "doc", "3")  
        .source(XContentType.JSON,"field", "baz"));

注意:大容量API仅支持以JSON或SMILE编码的文档。以任何其他格式提供文档将导致错误。

同步操作:

BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);

客户端将是高级Rest客户端,执行将是同步的。

异步操作(推荐方法):

client.bulkAsync(request, RequestOptions.DEFAULT, listener);

批量请求的异步执行需要将BulkRequest实例和ActionListener实例传递给异步方法

Listener Example:

ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
    @Override
    public void onResponse(BulkResponse bulkResponse) {

    }

    @Override
    public void onFailure(Exception e) {

    }
};

返回的BulkSolutions包含有关执行操作的信息,并允许按以下方式迭代每个结果:

for (BulkItemResponse bulkItemResponse : bulkResponse) { 
    DocWriteResponse itemResponse = bulkItemResponse.getResponse(); 

    if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
            || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { 
        IndexResponse indexResponse = (IndexResponse) itemResponse;

    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) { 
        UpdateResponse updateResponse = (UpdateResponse) itemResponse;

    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { 
        DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
    }
}

可以选择提供以下参数:

request.timeout(TimeValue.timeValueMinutes(2)); 
request.timeout("2m");

我希望这有帮助。

田易安
2023-03-14

您只需new BulkRequest()即可添加请求,而无需使用BulkRequestBuilder,如:

BulkRequest request = new BulkRequest();
request.add(new IndexRequest("foo", "bar", "1")
        .source(XContentType.JSON,"field", "foobar"));
request.add(new IndexRequest("foo", "bar", "2")
        .source(XContentType.JSON,"field", "foobar"));
...
BulkResponse bulkResponse = myHighLevelClient.bulk(request, RequestOptions.DEFAULT);
 类似资料:
  • 如何使用/用JSON文件中的多个文档填充ES中的现有索引? 编辑 我知道ES BulkAPI和Spring的方法,但是这样我必须自己处理JSON解析(我想避免这种方式)。但是如果我们已经知道索引和映射,我想知道是否有更简单的方法只需要传递文件?

  • 输入将是大小约为5MB的文件类型。 我浏览了ES Java API和SpringData的示例,它们确实有插入JSON文档的教程。 但是关于使用File作为输入来创建文档/索引的任何帮助都是不可用的。

  • 问题内容: 我有一个简单的python脚本,用于索引包含一百万行的CSV文件: 这种方法效果很好,但是当我们进入成千上万的时候,它们的速度都呈指数下降。 我猜测如果我在较小的块中进行索引,ES的性能会更好。 有更有效的方法吗?sleep()会延迟帮助吗?还是有一种简单的方法可以通过编程将csv分成较小的块? 谢谢。 问题答案: 每运行N次 这里的例子

  • 问题内容: 我是ElasticSearch的新手。我正在尝试为索引重新索引以便对其重命名。我正在使用NEST API v5.4。我看到了这个例子: 资料来源 :http : //thomasardal.com/elasticsearch-migrations-with-c-and- nest/ 但是,我无法使用NEST 5.4重现此内容。我认为这是2.4版。我检查ElasticSearch的重大更

  • 问题内容: 我正在努力完成索引创建这一简单任务,目标是使用分析器和字段映射创建索引。当我使用分析器创建索引时,我可以通过分析api调用与分析器通信,但是当我添加映射信息时,创建索引调用失败,并显示“字段[$ field]]找不到Analyzer [analyzer1]”,我创建了一个脚本来显示问题: 问题答案: 我相信您的问题是这些设置需要嵌套在JSON的一个节点内,而不是您所拥有的嵌套在一个节点

  • 问题内容: 我正在使用Solr进行索引和搜索。现在,我的新的数据被索引上elasticsearch。如何使用来自Elasticsearch的Solr索引进行组合搜索? 由于Solr和elasticsearch都是基于Apache Lucene 构建的,因此必须有一种方法/插件来使用Elasticsearch的Solr索引,对吗? 我的尝试: 我为此找到了一条河,但是elasticsearch从1.