我想问一下关于Elasticsearch批量API的问题
这是我使用批量API的代码
public void bulkInsert(String index, ArrayList<String> jsonList) throws IOException {
BulkRequest request = new BulkRequest();
for(String json: jsonList){
if(json != null&& !json.isEmpty()){
request.add(new IndexRequest(index)
.source(json, XContentType.JSON));
}
}
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
for (BulkItemResponse bulkItemResponse : bulkResponse) {
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
switch (bulkItemResponse.getOpType()) {
case INDEX:
case CREATE:
IndexResponse indexResponse = (IndexResponse) itemResponse;
break;
case UPDATE:
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
break;
case DELETE:
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
}
}
if (bulkResponse.hasFailures()) {
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure =
bulkItemResponse.getFailure();
System.out.println("failed: " + failure.getId());
}
}
}
}
我遇到了超时异常,因为我的记录有800K。java.net.SocketTimeoutException:连接超时30,000毫秒http-outgoing-16[活动]
我试图分解传入的jsonList,但有时会出现相同的错误。
我目前使用的是Elasticsearch 7.6.2版本。
异常跟踪
Java.net.SocketTimeoutException:连接http-outgoing-16[ACTIVE],位于org.elasticsearch.client.RestClient.ExtractandWrapcause(RestClient.Java:808)处org.elasticsearch.client.RestClient.PerformRequest(RestClient.Java:248)处org.elasticsearch.client.RestClient.PerformRequest(RestClient.Java:235)处org.elasticsearch.client.RestHighLevelClient.InternalPerformRequesta.lang.Reflect.Method.Invoke(未知源)在org.eclipse.jdt.internal.jarinjarloader.jarrsrcloader.main(jarrsrcloader.java:58)处,原因是:java.net.sockettimeoutexception:30,000毫秒超时连接http-outgoing-16[ACTIVE]在org.apache.http.nio.protocol.httpasyncrequestexecutor.timeout(httpasyncrequestexecutor.387)在nio.reactor.baseioreactor.execute(baseioreactor.java:104)在org.apache.http.impl.nio.reactor.abstractmultiworkerioreactor$worker.run(abstractmultiworkerioreactor.java:591)在java.lang.thread.run(未知源)
由于您正在使用批量API并向Elasticsearch发送大量数据,并且连接的默认超时为30秒
,而Elasticsearch无法在30秒
内完成这个巨大的批量操作,因此会出现此异常。
这种超时对于大容量API是正常的,在您的情况下(特定于索引),可以执行以下操作:
扩展集群即添加更多的CPU、内存、更好的磁盘、禁用refresh_interval(默认值为1秒)以加快批量索引。
如正式ES文件所述
request.timeout(TimeValue.timeValueMinutes(2)); --> 2 min timeout
request.timeout("2m"); --> string format of 2 sec.
编辑:如评论中所问,您可以使用批量API的同步执行,如果您想立即检查您的批量API的响应,下面引用自同一文档:
检索操作的响应(成功与否),可以是IndexResponse、UpdateResponse或DeleteResponse,都可以看作是DocWriteResponse实例
问题内容: 似乎我有一个相似但不相同的查询,因此最好像@Val建议的那样,让其他人从中受益。 因此,类似于上述内容,我需要在索引中插入大量数据(我的初始测试大约是10000个文档,但这只是针对POC,还有更多)。我想插入的数据在.json文档中,看起来像这样(片段): 我自己是ElasticSearch的新手,但是,从阅读文档开始,我的假设是我可以获取.json文件并根据其中的数据创建索引。从那以
背景资料 我正在开发一个API,允许用户传递关于成员的详细信息列表(姓名,电子邮件地址,...)我想使用这些信息与我的Elasticsearch数据库中的帐户记录进行匹配,并返回潜在匹配的列表。 我以为这将是简单的做一个bool查询领域我想要的,但是我似乎没有得到任何点击。 我对Elasticsearch比较陌生,我当前的搜索请求如下所示。 查询示例 POST/member/account/\u搜
是否可以在ElasticSearch中进行批量原子更新? 我知道定期批量更新不是原子的,正如这里所指出的:https://www.elastic.co/guide/en/elasticsearch/guide/current/bulk.html#bulk 还有其他方法可以自动更新多个文档吗?即。要么所有更新都发生,要么没有更新。
问题内容: 我找不到任何有关Elastic Bulk API在一项或多项操作失败时会发生什么情况的文档。例如,对于以下请求,假设已经有一个ID为“ 3”的文档,那么“创建”应该 失败-这会使所有其他操作失败 吗? 我正在使用nodejs弹性模块。 问题答案: 任何一项行动的任何失败都不会影响其他行动。 从elasticsearch bulk api 的文档中: 对批量操作的响应是一个大型JSON结
问题内容: 我正在尝试将JSON文件批量索引到新的Elasticsearch索引中,但无法这样做。我在JSON中有以下示例数据 我在用 当我尝试使用Elasticsearch的标准批量索引API时,出现此错误 任何人都可以帮助索引这种类型的JSON吗? 问题答案: 您需要做的是读取该JSON文件,然后使用端点期望的格式构建一个批量请求,即,一行用于命令,一行用于文档,并用换行符分隔…冲洗并重复以下
我在Azure上有3个主节点+3个数据节点elasticsearch集群。我试图执行批量操作,但我得到了关于节点本身的失败错误,以下是我如何设置客户端: 下面是批量操作: 它开始对1000个记录批次做出罚款响应,如下所示: 将执行由{1001}个操作组成的新大容量 然后我开始得到以下错误: 传输:383-[Stanley Stewart]未能获取{#传输#-1}{10.0.0.10}{10.0.0