Scrolling trough a lot of documents in an elastic search index might take some time, but fortunately there is a few things that we can do about it. In this article I’ll go through two, small but effective, things we can do to increase the scroll performance.
在弹性搜索索引中滚动浏览许多文档可能需要一些时间,但是幸运的是,我们可以做一些事情。 在本文中,我将介绍两种可以提高滚动性能的小巧有效的方法。
滚动批量大小 (Scroll Batch Size)
The first thing we should do is to find the optimal batch size for our environment. I’ll define the optimal batch size as the size where we get the most throughput for our query.
我们应该做的第一件事是为我们的环境找到最佳的批量大小。 我将最佳批处理大小定义为查询获得最大吞吐量的大小。
A small batch size might cause network communication overhead while a big batch size will cause memory allocation overhead if our documents are big. The sweet spot is probably somewhere in the middle and will be different for most environment as it depends on network latency, how the data looks like, index size and more. Therefore we need to iterate through different batch sizes in order to find that sweet spot.
如果文档很大,小批量可能会导致网络通信开销,而大批量会导致内存分配开销。 最佳位置可能在中间,并且在大多数环境中会有所不同,这取决于网络延迟,数据的外观,索引大小等。 因此,我们需要遍历不同的批次大小以找到最佳位置。
I’ve quite often assumed that a higher batch size is better, but that must not be the case and a batch size of 10 documents might get you a higher throughput than 100, 1000 or even 10000 documents per batch. So start of by testing 10 documents per batch and then increase from there until the throughput degrades.
我经常认为较高的批处理大小会更好,但事实并非如此,如果批处理大小为10个文档,则吞吐量可能会高于每批100、1000甚至10000个文档。 因此,从每批测试10个文档开始,然后从那里开始增加直到吞吐量降低。
If we’ve found that optimal batch size but aren’t happy with the throughput we should take a look at our cluster and it’s resources. Is the CPU (and/or memory) operating at 100 % or do we still have resources that we can use? If we do have plenty of resources left or have a cluster with more than one node, then we have at least one performance booster left that we can start playing around with - Slicing.
如果我们发现最佳的批处理大小,但对吞吐量不满意,则应查看集群及其资源。 CPU(和/或内存)是否正在以100%的速度运行,还是我们仍有可用的资源? 如果确实有足够的资源或集群中有多个节点,那么至少还有一个性能提升器,我们可以开始使用-切片。
切片卷轴 (Sliced scroll)
If we want to scroll through our documents using any kind of parallelism we need to perform a so called sliced scroll. If the cluster have several nodes we will definitely benefit from slicing as we may spread out the workload on more CPUs.
如果我们想使用任何一种并行性在文档中滚动,则需要执行所谓的切片滚动。 如果集群中有多个节点,我们肯定会从切片中受益,因为我们可能会将工作量分散到更多的CPU上。
It came to my attention that this wasn’t that straight forward to do in a java application, well the slice query was straight forward, but to fully utilize it wasn’t.
引起我注意的是,这并不是在Java应用程序中直接做到的,切片查询虽然很简单,但是要充分利用它却不是。
In order to do regular scrolls (without slicing) we’ve created an iterator that initially makes a scroll request and then keeps on scrolling for each iteration. The usage in our java code pretty much turns into
为了进行常规滚动(不进行切片),我们创建了一个迭代器,该迭代器最初发出滚动请求,然后在每次迭代中继续滚动。 我们的Java代码中的用法几乎变成了
Iterator<Collection<Foo>> fooIterator = new EsIterator<>(index, query, batchSize, Foo.class);
This iterator can’t handle slicing out of the box since each scroll request needs to be initialized with the slice id and the maximum number of slices - “This is slice 1 of 2 for query x”. That means that the iterator will only be able to handle one sliced scroll.
该迭代器无法处理开箱即用的切片,因为每个滚动请求都需要使用切片ID和最大切片数量进行初始化-“这是查询x的2个切片1”。 这意味着迭代器将只能处理一个切片的滚动。
To benefit from slicing we can create a new iterator with the needed slicing parameters. If we want our scroll to be sliced two times, we can create two iterators.
为了从切片中受益,我们可以使用所需的切片参数创建一个新的迭代器。 如果我们想将滚动切片两次,则可以创建两个迭代器。
// Slice id 0, max slices 2
Iterator<Collection<Foo>> fooIterator1 = new EsIterator<>(index, query, batchSize, Foo.class, 0, 2);
// Slice id 1, max slices 2
Iterator<Collection<Foo>> fooIterator2 = new EsIterator<>(index, query, batchSize, Foo.class, 1, 2);
We can then iterate through each iterator within a separate thread and we have achieved parallelism. But that is not very dynamic. A cleaner approach would be if we could write something like;
然后,我们可以在单独的线程中迭代每个迭代器,并且已经实现了并行性。 但这不是很动态。 如果我们可以写类似的东西,那就是一种更清洁的方法。
Spliterator<Collection<Foo>> fooSpliterator = EsSpliterator.from(index, query, batchSize, Foo.class, 2);
StreamSupport.stream(fooSpliterator, true)
.forEach(fooBatch -> handle(fooBatch));
Above we have created a Spliterator
that we supply to a stream with parallelism set to true. The spliterator takes the number of slices as its last parameter and is then responsible for creating the number of iterators as needed.
在上面,我们创建了一个Spliterator
,将其提供给并行度设置为true的流。 分割器将片的数量作为其最后一个参数,然后负责根据需要创建迭代器的数量。
Below is the spliterator class, it has a quite simple and naive approach. First we construct the spliterator with a list of all iterators, in our example we would have a list of two iterators. Then if parallelism is asked for, we will try to split our spliterator in half by creating a new spliterator with half of the iterators in our list.
下面是分隔器类,它具有一种非常简单且幼稚的方法。 首先,我们用所有迭代器的列表构造分离器,在我们的示例中,我们将有两个迭代器的列表。 然后,如果要求并行处理,我们将尝试通过使用列表中一半迭代器创建一个新的拆分器,将拆分器拆分为两部分。
class EsSpliterator<T> implements Spliterator<Collection<T>> {
private final List<EsIterator<T>> iterators = new ArrayList<>();
// constructor omitted
private static <T> EsSpliterator<T> from(String index, QueryBuilder query, int batchSize, Class<T> documentClass, int maxSlices) {
List<EsIterator<T>> iterators = IntStream.range(0, maxSlices)
.mapToObj(i -> new EsIterator<>(index, query, batchSize, documentClass, i, maxSlices))
.collect(toList());
return new EsSpliterator<>(iterators);
}
@Override
public boolean tryAdvance(Consumer<? super Collection<T>> action) {
List<EsIterator<T>> hasNextIterators = iterators.stream()
.filter(EsIterator::hasNext)
.collect(toList());
if (!hasNextIterators.isEmpty()) {
hasNextIterators.forEach(it -> action.accept(it.next()));
return true;
}
return false;
}
@Override
public Spliterator<Collection<T>> trySplit() {
if (iterators.size() >= 2) {
List<EsIterator<T>> sublist = iterators.stream()
.limit(iterators.size() / 2)
.collect(toList());
iterators.removeAll(sublist);
return new EsSpliterator<>(sublist);
}
return null;
}
@Override
public long estimateSize() {
return iterators.stream().map(EsIterator::getTotalHits).mapToLong(l -> l).sum();
}
@Override
public int characteristics() {
return SIZED;
}
}
tryAdvance
is the equivalent of our iterators hasNext
and next
methods. There we simply check which iterators in our list have more elements and then accept the given action by getting the next batch of elements from those iterators.
tryAdvance
等效于我们的迭代器hasNext
和next
方法。 在那里,我们只需检查列表中的哪些迭代器包含更多元素,然后通过从那些迭代器获取下一批元素来接受给定的操作。
The scroll response returns the number of total hits for our request and therefore we can estimate a correct size for each iterator, hence the sized characteristics.
滚动响应返回我们请求的总点击数,因此我们可以为每个迭代器估计正确的大小,从而确定大小特征。
结论 (Conclusion)
We now have a more dynamic way of handle sliced scrolling and may start experimenting with the number of slices and batch size for each slice to achieve maximal throughput. The process is more or less the same as when finding the batch size for a single threaded scrolling.
现在,我们有了一种更动态的处理切片滚动的方式,并且可以开始尝试切片的数量和每个切片的批处理大小以实现最大吞吐量。 该过程与为单线程滚动查找批处理大小时大致相同。
I would recommend starting by using the same max slices as the number of shards for your index iIf you have your shards evenly spread over your cluster. Then you will use resources from all nodes and thereby probably increasing your scroll speed.
我建议首先使用与索引的分片数量相同的最大片数,如果分片平均分布在群集中。 然后,您将使用来自所有节点的资源,从而可能会提高滚动速度。
文档中有关切片数量的注释 (A note from the docs about the number of slices)
If the number of slices is bigger than the number of shards the slice filter is very slow on the first calls, it has a complexity of O(N) and a memory cost equals to N bits per slice where N is the total number of documents in the shard. After few calls the filter should be cached and subsequent calls should be faster but you should limit the number of sliced query you perform in parallel to avoid the memory explosion.
如果切片的数量大于碎片的数量,则切片过滤器在第一次调用时非常慢,它的复杂度为O(N),内存成本等于每个切片N位,其中N是文档总数在碎片中。 几次调用后,应将过滤器缓存起来,随后的调用应更快,但应限制并行执行的切片查询的数量,以避免内存爆炸。
关于线程池的说明 (A note about thread pools)
By default the common fork join pool is used when using parallelism in a stream, it could be useful to supply your own thread pool for this task. It can be done by wrapping the whole task in a pool.
默认情况下,在流中使用并行性时使用公共的fork连接池,为该任务提供自己的线程池可能很有用。 可以通过将整个任务包装在一个池中来完成。
threadPool.submit() -> {
Spliterator<T> spliterator = EsSpliterator.from(index, query, batchSize, documentClass, slices)
StreamSupport.stream(spliterator, true)
.forEach(batch -> handle(batch));
}.get();
Happy scrolling,Markus Dybeck.
滚动愉快,Markus Dybeck。