当前位置: 首页 > 知识库问答 >
问题:

如何创建通用分页拆分器?

欧阳晗日
2023-03-14

我希望能够处理从必须在页面中访问的源读取的java流。作为第一种方法,我实现了一个分页迭代器,它在当前页面用尽条目时简单地请求页面,然后使用< code > stream support . stream(iterator,false)获取迭代器上的流句柄。

因为我发现获取我的页面非常昂贵,所以我想通过并行流的方式访问页面。此时,我发现由于java直接从迭代器提供的spliterator实现,我的简单方法所提供的并行性是不存在的。由于我实际上非常了解我想要遍历的元素(在请求第一个页面之后,我知道总的结果计数,并且源代码支持偏移和限制),我认为应该可以实现我自己的spliterator来实现真正的并发性(在页面元素上完成的工作和页面查询中)。

我已经能够非常轻松地实现“在元素上完成的工作”并发,但是在我的初始实现中,页面的查询仅由最顶层的拆分器完成,因此不会从分叉连接实现提供的工作分工中受益。

如何编写一个同时实现这两个目标的拆分器?

作为参考,我将提供我到目前为止所做的工作(我知道它没有适当地划分查询)。

   public final class PagingSourceSpliterator<T> implements Spliterator<T> {

    public static final long DEFAULT_PAGE_SIZE = 100;

    private Page<T> result;
    private Iterator<T> results;
    private boolean needsReset = false;
    private final PageProducer<T> generator;
    private long offset = 0L;
    private long limit = DEFAULT_PAGE_SIZE;


    public PagingSourceSpliterator(PageProducer<T> generator) {
        this.generator = generator;
    }

    public PagingSourceSpliterator(long pageSize, PageProducer<T> generator) {
        this.generator = generator;
        this.limit = pageSize;
    }


    @Override
    public boolean tryAdvance(Consumer<? super T> action) {

        if (hasAnotherElement()) {
            if (!results.hasNext()) {
                loadPageAndPrepareNextPaging();
            }
            if (results.hasNext()) {
                action.accept(results.next());
                return true;
            }
        }

        return false;
    }

    @Override
    public Spliterator<T> trySplit() {
        // if we know there's another page, go ahead and hand off whatever
        // remains of this spliterator as a new spliterator for other
        // threads to work on, and then mark that next time something is
        // requested from this spliterator it needs to be reset to the head
        // of the next page
        if (hasAnotherPage()) {
            Spliterator<T> other = result.getPage().spliterator();
            needsReset = true;
            return other;
        } else {
            return null;
        }

    }

    @Override
    public long estimateSize() {
        if(limit == 0) {
            return 0;
        }

        ensureStateIsUpToDateEnoughToAnswerInquiries();
        return result.getTotalResults();
    }

    @Override
    public int characteristics() {
        return IMMUTABLE | ORDERED | DISTINCT | NONNULL | SIZED | SUBSIZED;
    }

    private boolean hasAnotherElement() {
        ensureStateIsUpToDateEnoughToAnswerInquiries();
        return isBound() && (results.hasNext() || hasAnotherPage());
    }

    private boolean hasAnotherPage() {
        ensureStateIsUpToDateEnoughToAnswerInquiries();
        return isBound() && (result.getTotalResults() > offset);
    }

    private boolean isBound() {
        return Objects.nonNull(results) && Objects.nonNull(result);
    }

    private void ensureStateIsUpToDateEnoughToAnswerInquiries() {
        ensureBound();
        ensureResetIfNecessary();
    }

    private void ensureBound() {
        if (!isBound()) {
            loadPageAndPrepareNextPaging();
        }
    }

    private void ensureResetIfNecessary() {
        if(needsReset) {
            loadPageAndPrepareNextPaging();
            needsReset = false;
        }
    }

    private void loadPageAndPrepareNextPaging() {
        // keep track of the overall result so that we can reference the original list and total size
        this.result = generator.apply(offset, limit);

        // make sure that the iterator we use to traverse a single page removes
        // results from the underlying list as we go so that we can simply pass
        // off the list spliterator for the trySplit rather than constructing a
        // new kind of spliterator for what remains.
        this.results = new DelegatingIterator<T>(result.getPage().listIterator()) {
            @Override
            public T next() {
                T next = super.next();
                this.remove();
                return next;
            }
        };

        // update the paging for the next request and inquiries prior to the next request
        // we use the page of the actual result set instead of the limit in case the limit
        // was not respected exactly.
        this.offset += result.getPage().size();
    }

    public static class DelegatingIterator<T> implements Iterator<T> {

        private final Iterator<T> iterator;

        public DelegatingIterator(Iterator<T> iterator) {
            this.iterator = iterator;
        }


        @Override
        public boolean hasNext() {
            return iterator.hasNext();
        }

        @Override
        public T next() {
            return iterator.next();
        }

        @Override
        public void remove() {
            iterator.remove();
        }

        @Override
        public void forEachRemaining(Consumer<? super T> action) {
            iterator.forEachRemaining(action);
        }
    }
}

以及我的页面来源:

public interface PageProducer<T> extends BiFunction<Long, Long, Page<T>> {

}

还有一页:

public final class Page<T> {

    private long totalResults;
    private final List<T> page = new ArrayList<>();

    public long getTotalResults() {
        return totalResults;
    }

    public List<T> getPage() {
        return page;
    }

    public Page setTotalResults(long totalResults) {
        this.totalResults = totalResults;
        return this;
    }

    public Page setPage(List<T> results) {
        this.page.clear();
        this.page.addAll(results);
        return this;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (!(o instanceof Page)) {
            return false;
        }
        Page<?> page1 = (Page<?>) o;
        return totalResults == page1.totalResults && Objects.equals(page, page1.page);
    }

    @Override
    public int hashCode() {
        return Objects.hash(totalResults, page);
    }

}

以及获取具有“慢速”分页的流以进行测试的示例

private <T> Stream<T> asSlowPagedSource(long pageSize, List<T> things) {

    PageProducer<T> producer = (offset, limit) -> {

        try {
            Thread.sleep(5000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        int beginIndex = offset.intValue();
        int endIndex = Math.min(offset.intValue() + limit.intValue(), things.size());
        return new Page<T>().setTotalResults(things.size())
                .setPage(things.subList(beginIndex, endIndex));
    };

    return StreamSupport.stream(new PagingSourceSpliterator<>(pageSize, producer), true);
}

共有2个答案

卢光誉
2023-03-14

https://docs.oracle.com/javase/8/docs/api/java/util/Spliterator.html

根据我的理解,拆分的速度来自不可变性。源越不可变,处理速度越快,因为不可变性更好地提供parrallel处理或更确切地说是拆分。

这个想法似乎是在将源作为一个整体(最好)或部分(通常是这种情况,因此您和许多其他人的挑战)绑定到拆分器之前,尽可能最好地处理源的更改(如果有的话)。

在您的情况下,这可能意味着首先要确保页面大小,而不是:

//...以防没有严格遵守限制。this.offset=result.getPage(). size();

这也可能意味着需要准备流馈送,而不是用作直接来源。

文档末尾有一个示例“并行计算框架(如java.util.stream包)如何在并行计算中使用Spliterator”。

请注意,这是流使用拆分器的方式,而不是拆分器使用流作为源的方式。

在示例的末尾有一个有趣的“计算”方法。

PS如果你得到一个通用的高效PageSpliterator类,一定要让我们中的一些人知道。

干杯。

邹缪文
2023-03-14

您的拆分器无法让您更接近目标的主要原因是它试图拆分页面,而不是源元素空间。如果您知道元素的总数并且有一个允许通过偏移量和限制来获取页面的源,那么拆分器最自然的形式是在这些元素中封装一个范围,例如通过偏移量和限制或结束。然后,拆分意味着只是拆分该范围,将拆分器的偏移量调整到拆分位置,并创建一个代表前缀的新拆分器,从“旧偏移量”到拆分位置。

Before splitting:
      this spliterator: offset=x, end=y
After splitting:
      this spliterator: offset=z, end=y
  returned spliterator: offset=x, end=z

x <= z <= y

而在最好的情况下,z正好在xy之间,以产生平衡的拆分,但在我们的例子中,我们将稍微调整它以产生页面大小的倍数的工作集。

逻辑无需获取页面即可工作,因此,如果您将提取页面推迟到当前,则框架希望开始遍历,即在拆分后,提取操作可以并行运行。最大的障碍是您需要获取第一页才能了解元素的总数。下面的解决方案将此第一个获取与其他提取分开,从而简化了实现。当然,它必须向下传递第一个页面获取的结果,该结果将在第一次遍历时消耗(在顺序情况下)或作为第一个拆分前缀返回,此时接受一个不平衡的拆分,但之后不必再处理它。

public class PagingSpliterator<T> implements Spliterator<T> {
    public interface PageFetcher<T> {
        List<T> fetchPage(long offset, long limit, LongConsumer totalSizeSink);
    }
    public static final long DEFAULT_PAGE_SIZE = 100;

    public static <T> Stream<T> paged(PageFetcher<T> pageAccessor) {
        return paged(pageAccessor, DEFAULT_PAGE_SIZE, false);
    }
    public static <T> Stream<T> paged(PageFetcher<T> pageAccessor,
                                      long pageSize, boolean parallel) {
        if(pageSize<=0) throw new IllegalArgumentException();
        return StreamSupport.stream(() -> {
            PagingSpliterator<T> pgSp
                = new PagingSpliterator<>(pageAccessor, 0, 0, pageSize);
            pgSp.danglingFirstPage
                =spliterator(pageAccessor.fetchPage(0, pageSize, l -> pgSp.end=l));
            return pgSp;
        }, CHARACTERISTICS, parallel);
    }
    private static final int CHARACTERISTICS = IMMUTABLE|ORDERED|SIZED|SUBSIZED;

    private final PageFetcher<T> supplier;
    long start, end, pageSize;
    Spliterator<T> currentPage, danglingFirstPage;

    PagingSpliterator(PageFetcher<T> supplier,
            long start, long end, long pageSize) {
        this.supplier = supplier;
        this.start    = start;
        this.end      = end;
        this.pageSize = pageSize;
    }

    public boolean tryAdvance(Consumer<? super T> action) {
        for(;;) {
            if(ensurePage().tryAdvance(action)) return true;
            if(start>=end) return false;
            currentPage=null;
        }
    }
    public void forEachRemaining(Consumer<? super T> action) {
        do {
            ensurePage().forEachRemaining(action);
            currentPage=null;
        } while(start<end);
    }
    public Spliterator<T> trySplit() {
        if(danglingFirstPage!=null) {
            Spliterator<T> fp=danglingFirstPage;
            danglingFirstPage=null;
            start=fp.getExactSizeIfKnown();
            return fp;
        }
        if(currentPage!=null)
            return currentPage.trySplit();
        if(end-start>pageSize) {
            long mid=(start+end)>>>1;
            mid=mid/pageSize*pageSize;
            if(mid==start) mid+=pageSize;
            return new PagingSpliterator<>(supplier, start, start=mid, pageSize);
        }
        return ensurePage().trySplit();
    }
    /**
     * Fetch data immediately before traversing or sub-page splitting.
     */
    private Spliterator<T> ensurePage() {
        if(danglingFirstPage!=null) {
            Spliterator<T> fp=danglingFirstPage;
            danglingFirstPage=null;
            currentPage=fp;
            start=fp.getExactSizeIfKnown();
            return fp;
        }
        Spliterator<T> sp = currentPage;
        if(sp==null) {
            if(start>=end) return Spliterators.emptySpliterator();
            sp = spliterator(supplier.fetchPage(
                                 start, Math.min(end-start, pageSize), l->{}));
            start += sp.getExactSizeIfKnown();
            currentPage=sp;
        }
        return sp;
    }
    /**
     * Ensure that the sub-spliterator provided by the List is compatible with
     * ours, i.e. is {@code SIZED | SUBSIZED}. For standard List implementations,
     * the spliterators are, so the costs of dumping into an intermediate array
     * in the other case is irrelevant.
     */
    private static <E> Spliterator<E> spliterator(List<E> list) {
        Spliterator<E> sp = list.spliterator();
        if((sp.characteristics()&(SIZED|SUBSIZED))!=(SIZED|SUBSIZED))
            sp=Spliterators.spliterator(
                StreamSupport.stream(sp, false).toArray(), IMMUTABLE | ORDERED);
        return sp;
    }
    public long estimateSize() {
        if(currentPage!=null) return currentPage.estimateSize();
        return end-start;
    }
    public int characteristics() {
        return CHARACTERISTICS;
    }
}

它使用一个专门的PageFetcher函数接口,可以通过调用回调的 accept 方法和生成的总大小并返回项目列表来实现。分页拆分器将简单地委托给列表的拆分器进行遍历,如果并发性明显高于生成的页数,它甚至可能受益于拆分这些页面拆分器,这意味着随机访问列表(如 ArrayList)是此处的首选列表类型。

将示例代码调整为

private static <T> Stream<T> asSlowPagedSource(long pageSize, List<T> things) {
    return PagingSpliterator.paged( (offset, limit, totalSizeSink) -> {
        totalSizeSink.accept(things.size());
        if(offset>things.size()) return Collections.emptyList();
        int beginIndex = (int)offset;
        assert beginIndex==offset;
        int endIndex = Math.min(beginIndex+(int)limit, things.size());
        System.out.printf("Page %6d-%6d:\t%s%n",
                          beginIndex, endIndex, Thread.currentThread());
        // artificial slowdown
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
        return things.subList(beginIndex, endIndex);
    }, pageSize, true);
}

您可以像这样测试它

List<Integer> samples=IntStream.range(0, 555_000).boxed().collect(Collectors.toList());
List<Integer> result =asSlowPagedSource(10_000, samples) .collect(Collectors.toList());
if(!samples.equals(result))
    throw new AssertionError();

给定足够的可用 CPU 内核,它将演示如何同时获取页面,因此是无序的,而结果将正确按遭遇顺序排列。您还可以测试子页面并发性,该子页面并发性在页面较少时适用:

Set<Thread> threads=ConcurrentHashMap.newKeySet();
List<Integer> samples=IntStream.range(0, 1_000_000).boxed().collect(Collectors.toList());
List<Integer> result=asSlowPagedSource(500_000, samples)
    .peek(x -> threads.add(Thread.currentThread()))
    .collect(Collectors.toList());
if(!samples.equals(result))
    throw new AssertionError();
System.out.println("Concurrency: "+threads.size());
 类似资料:
  • 我读到DB2不支持限制和偏移量。我还读到您必须使用ROW_NUMBER()和子查询来获得所需的结果。如果这是SQL查询: 其中$offset是偏移量,$rowserPage是我希望在页面上显示的数据库行的数量,这可以等效为DB2查询。

  • 问题内容: 我需要分解一个始终如下所示的字符串: 东西-something_else。 我需要在另一个输入字段中输入“ something_else”。当前,此字符串示例正被即时添加到HTML表行中,如下所示: 我认为“拆分”是可行的方法,但是几乎找不到文档。 问题答案: 可以在MDN找到文档。请注意,是不是 一个jQuery方法,但本地字符串的方法。 如果在字符串上使用,则返回带有子字符串的数组

  • 我发现下面的分页是用Vue实现的。我想将分页栏水平居中。我尝试了不同的方法(将class=“text-center”和style=“text-align:center!importance;”添加到nav和v-pagination标记),但都不奏效。有什么办法可以解决吗? CodePen中的代码:

  • 我必须创建一个询问 我试图解决的任何一个项目在一个列表中,我有一些问题,这里是一个代码 `此代码仅在以下情况下提供预期输出: 如果len(list)可被page_number整除,如果page_number为0,或如果page_number大于并等于len(list)` 臭虫 假设len(list)是9,page_number是(2,4,5,6,7,8)之一,9%(2,4,5,6,7,8)的剩余部

  • 问题内容: 我正在尝试创建带有分页的UICollectionView,并且每个项目的最大宽度为250点,我已经设法创建了它,但是我遇到了两个问题:第一个项目开始时不正确,但是开始时有更多空间当我尝试滑动时,总会有一些东西无法让我平稳滑动。 看起来是这样的: 影片连结 这是我的代码: CenterCellCollectionFlowLayout.swift MainViewController.sw

  • 我需要创建一个带有分页的HTML表。数据来自两个不同的来源(可能是来自两个不同数据库的两个表,比如一个Oracle,另一个是MySQL),您不能使用JOIN select语句。为了使它更复杂,我需要以升序显示按时间戳排序的数据(其中一个属性是timestamp)。 例如,源A有45条记录,源B有55条记录。因此,该表将显示总共100条记录,但一次只显示15条记录。因此必须有7页(6页15条记录,1