我正在尝试使用.Net API
in 进行批量插入Elasticsearch
,这是执行操作时遇到的错误;
Error {Type: es_rejected_execution_exception Reason: "rejected execution of org.elasticsearch.transport.TransportService$6@604b47a4 on EsThreadPoolExecutor[bulk, queue capacity = 50, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51f4f734[Running, pool size = 4, active threads = 4, queued tasks = 50, completed tasks = 164]]" CausedBy: ""} Nest.BulkError
是由于系统空间不足还是批量插入功能本身不起作用?我的NEST
版本是5.0
,Elasticsearch
版本也是5.0
。
批量插入逻辑代码;
public void bulkInsert(List<BaseData> recordList, List<String> listOfIndexName) {
BulkDescriptor descriptor = new BulkDescriptor();
foreach (var j in Enumerable.Range(0, recordList.Count)) {
descriptor.Index<BaseData>(op => op.Document(recordList[j])
.Index(listOfIndexName[j]));
}
var result = clientConnection.Bulk(descriptor);
}
正如Val在评论中所说,您一次发送的数据可能会超出集群的处理能力。看来您可能正在尝试在 一个* 批量请求中发送 所有
文档,但对于许多文档或大型文档而言,这可能无法正常工作。
*
使用时_bulk
,除了可以同时发送到集群的批量请求数量外,您还需要将数据以多个批量请求的形式发送到集群,并找到每个批量请求中可以发送的 最佳
文档数。
此处没有确定最佳大小的硬性规定,因为它取决于文档的复杂性,分析方式,集群硬件,集群设置,索引设置等而有所不同。
最好的做法是从一个合理的数字开始,例如在一个请求中说500个文档(或在您的上下文中有意义的某个数字),然后从那里开始。计算每个批量请求的总大小(以字节为单位)也是一种不错的方法。如果性能和吞吐量不足,请增加文档数量,请求字节大小或并发请求,直到您开始看到es_rejected_execution_exception
。
NEST 5.x附带了一个方便的助手,使用IObservable<T>
和可观察的设计模式使批量请求变得更加容易
void Main()
{
var client = new ElasticClient();
// can cancel the operation by calling .Cancel() on this
var cancellationTokenSource = new CancellationTokenSource();
// set up the bulk all observable
var bulkAllObservable = client.BulkAll(GetDocuments(), ba => ba
// number of concurrent requests
.MaxDegreeOfParallelism(8)
// in case of 429 response, how long we should wait before retrying
.BackOffTime(TimeSpan.FromSeconds(5))
// in case of 429 response, how many times to retry before failing
.BackOffRetries(2)
// number of documents to send in each request
.Size(500)
.Index("index-name")
.RefreshOnCompleted(),
cancellationTokenSource.Token
);
var waitHandle = new ManualResetEvent(false);
Exception ex = null;
// what to do on each call, when an exception is thrown, and
// when the bulk all completes
var bulkAllObserver = new BulkAllObserver(
onNext: bulkAllResponse =>
{
// do something after each bulk request
},
onError: exception =>
{
// do something with exception thrown
ex = exception;
waitHandle.Set();
},
onCompleted: () =>
{
// do something when all bulk operations complete
waitHandle.Set();
});
bulkAllObservable.Subscribe(bulkAllObserver);
// wait for handle to be set.
waitHandle.WaitOne();
if (ex != null)
{
throw ex;
}
}
// Getting documents should be lazily enumerated collection ideally
public static IEnumerable<Document> GetDocuments()
{
return Enumerable.Range(1, 10000).Select(x =>
new Document
{
Id = x,
Name = $"Document {x}"
}
);
}
public class Document
{
public int Id { get; set; }
public string Name { get; set; }
}
问题内容: 我正在尝试使用Nest将多个记录插入数据库。使用IndexMany类插入确实可以,但是我还需要通过json字符串插入对象。 我确实在github上进行了查找,并找到了一些如何使用RAWclient的示例。在代码示例下面,我插入json。 一些其他信息: jsondata: var twitter: 我从数据库收到的结果: 有人知道这个问题可能是什么吗?还是我在json /代码中丢失了什
问题内容: 我使用以下查询在弹性中创建了一个索引: 我想使用.net NEST库将文档插入此索引。我的问题是.net更新方法的签名对我没有任何意义。 Java库对我来说意义更大: 在NEST中,和类来自哪里?我制作的这些C#类代表我的索引吗? 问题答案: 并且是为POCO类型的泛型类型参数 在Elasticsearch()中表示文档,然后 执行部分更新时,在Elasticsearch()中表示文档
我有一个C#代码,使用Elastic搜索类型,它将匹配两个字段。我用的是NEST软件包。 问题是,无论我传入什么文本,它都会返回所有结果。我错过了什么?
问题内容: 我不确定是否在批量索引编制中正确使用了该操作。 我的要求是: 网址是: 我想我错过了文档中的某些内容,但仍然找不到如何进行此操作的方法。 我想要 在索引中创建以上文档,或者如果存在则对其进行更新。 问题答案: 如果您通过批量API将索引中的记录添加为 那么如果该ID已经存在于索引中,您将获得一个异常。如果要添加或 替换 文档(取决于文档是否存在),则应按以下方式进行请求 如果已经存在具
本文向大家介绍Mybatis批量插入返回成功的数目实例,包括了Mybatis批量插入返回成功的数目实例的使用技巧和注意事项,需要的朋友参考一下 Mybatis批量插入返回影响的行数 环境: postgresql 9.6.5 spring 4.1 mybatis3 junit4 log4j ThesisMapper.xml: Mapper.java 借口: 服务类: ThesisService: 测
问题内容: 我正在尝试根据出现的消息从elasticsearch检索所有数据,我发现如果使用Scroll,我可以循环直到文档搜索结束,但是以下查询返回Documents = 0但Total = 1954: 如果我使用滚动条,则文档为空,如果我删除并获得前1000条消息,则可以获取数据,我使用滚动条的方式有什么问题吗? 问题答案: 如果指定,则第一个响应不包含任何文档。它将为您提供属性中的全部文档,