当前位置: 首页 > 知识库问答 >
问题:

你能重新平衡一个尺寸未知的不平衡的分裂器吗?

越俊艾
2023-03-14

我想使用stream来并行化处理一组远程存储的数量未知的JSON文件(文件的数量不是预先知道的)。这些文件的大小可以有很大的不同,从每个文件1条JSON记录到其他文件中的100,000条记录不等。在本例中,JSON记录意味着在文件中表示为一行的自包含JSON对象

我真的想要使用流来实现这一点,所以我实现了这个拆分器:

public abstract class JsonStreamSpliterator<METADATA, RECORD> extends AbstractSpliterator<RECORD> {

    abstract protected JsonStreamSupport<METADATA> openInputStream(String path);

    abstract protected RECORD parse(METADATA metadata, Map<String, Object> json);

    private static final int ADDITIONAL_CHARACTERISTICS = Spliterator.IMMUTABLE | Spliterator.DISTINCT | Spliterator.NONNULL;
    private static final int MAX_BUFFER = 100;
    private final Iterator<String> paths;
    private JsonStreamSupport<METADATA> reader = null;

    public JsonStreamSpliterator(Iterator<String> paths) {
        this(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths);
    }

    private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths) {
        super(est, additionalCharacteristics);
        this.paths = paths;
    }

    private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths, String nextPath) {
        this(est, additionalCharacteristics, paths);
        open(nextPath);
    }

    @Override
    public boolean tryAdvance(Consumer<? super RECORD> action) {
        if(reader == null) {
            String path = takeNextPath();
            if(path != null) {
                open(path);
            }
            else {
                return false;
            }
        }
        Map<String, Object> json = reader.readJsonLine();
        if(json != null) {
            RECORD item = parse(reader.getMetadata(), json);
            action.accept(item);
            return true;
        }
        else {
            reader.close();
            reader = null;
            return tryAdvance(action);
        }
    }

    private void open(String path) {
        reader = openInputStream(path);
    }

    private String takeNextPath() {
        synchronized(paths) {
            if(paths.hasNext()) {
                return paths.next();
            }
        }
        return null;
    }

    @Override
    public Spliterator<RECORD> trySplit() {
        String nextPath = takeNextPath();
        if(nextPath != null) {
            return new JsonStreamSpliterator<METADATA,RECORD>(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths, nextPath) {
                @Override
                protected JsonStreamSupport<METADATA> openInputStream(String path) {
                    return JsonStreamSpliterator.this.openInputStream(path);
                }
                @Override
                protected RECORD parse(METADATA metaData, Map<String,Object> json) {
                    return JsonStreamSpliterator.this.parse(metaData, json);
                }
            };              
        }
        else {
            List<RECORD> records = new ArrayList<RECORD>();
            while(tryAdvance(records::add) && records.size() < MAX_BUFFER) {
                // loop
            }
            if(records.size() != 0) {
                return records.spliterator();
            }
            else {
                return null;
            }
        }
    }
}

我遇到的问题是,虽然一开始流的并行化效果很好,但最终最大的文件仍留在一个线程中处理。我相信最近的原因是有据可查的:分裂器是“不平衡的”。

更具体地说,在stream.foreach生命周期的某个点之后,似乎不会调用TrySplit方法,因此很少执行在TrySplit末尾分发小批的额外逻辑。

注意从trySplit返回的所有spliterator是如何共享相同的paths迭代器的。我认为这是平衡所有分裂器工作的一个非常聪明的方法,但它还不足以实现完全并行。

我希望并行处理首先在文件之间进行,然后当很少的大文件仍然被拆分时,我希望在其余文件的块之间进行并行处理。这就是trysplit结尾的else块的意图。

有没有一个容易/简单/规范的方法来解决这个问题?

共有2个答案

宇文学博
2023-03-14

经过大量的实验,我仍然无法通过使用大小估计来获得任何额外的并行性。基本上,long.max_value以外的任何值都会导致拆分器过早终止(并且没有进行任何拆分),而另一方面,long.max_value估计会导致trysplit被无情地调用,直到它返回null

我发现的解决办法是在分裂者之间内部共享资源,让他们在自己之间重新平衡。

工作代码:

public class AwsS3LineSpliterator<LINE> extends AbstractSpliterator<AwsS3LineInput<LINE>> {

    public final static class AwsS3LineInput<LINE> {
        final public S3ObjectSummary s3ObjectSummary;
        final public LINE lineItem;
        public AwsS3LineInput(S3ObjectSummary s3ObjectSummary, LINE lineItem) {
            this.s3ObjectSummary = s3ObjectSummary;
            this.lineItem = lineItem;
        }
    }

    private final class InputStreamHandler {
        final S3ObjectSummary file;
        final InputStream inputStream;
        InputStreamHandler(S3ObjectSummary file, InputStream is) {
            this.file = file;
            this.inputStream = is;
        }
    }

    private final Iterator<S3ObjectSummary> incomingFiles;

    private final Function<S3ObjectSummary, InputStream> fileOpener;

    private final Function<InputStream, LINE> lineReader;

    private final Deque<S3ObjectSummary> unopenedFiles;

    private final Deque<InputStreamHandler> openedFiles;

    private final Deque<AwsS3LineInput<LINE>> sharedBuffer;

    private final int maxBuffer;

    private AwsS3LineSpliterator(Iterator<S3ObjectSummary> incomingFiles, Function<S3ObjectSummary, InputStream> fileOpener,
            Function<InputStream, LINE> lineReader,
            Deque<S3ObjectSummary> unopenedFiles, Deque<InputStreamHandler> openedFiles, Deque<AwsS3LineInput<LINE>> sharedBuffer,
            int maxBuffer) {
        super(Long.MAX_VALUE, 0);
        this.incomingFiles = incomingFiles;
        this.fileOpener = fileOpener;
        this.lineReader = lineReader;
        this.unopenedFiles = unopenedFiles;
        this.openedFiles = openedFiles;
        this.sharedBuffer = sharedBuffer;
        this.maxBuffer = maxBuffer;
    }

    public AwsS3LineSpliterator(Iterator<S3ObjectSummary> incomingFiles, Function<S3ObjectSummary, InputStream> fileOpener, Function<InputStream, LINE> lineReader, int maxBuffer) {
        this(incomingFiles, fileOpener, lineReader, new ConcurrentLinkedDeque<>(), new ConcurrentLinkedDeque<>(), new ArrayDeque<>(maxBuffer), maxBuffer);
    }

    @Override
    public boolean tryAdvance(Consumer<? super AwsS3LineInput<LINE>> action) {
        AwsS3LineInput<LINE> lineInput;
        synchronized(sharedBuffer) {
            lineInput=sharedBuffer.poll();
        }
        if(lineInput != null) {
            action.accept(lineInput);
            return true;
        }
        InputStreamHandler handle = openedFiles.poll();
        if(handle == null) {
            S3ObjectSummary unopenedFile = unopenedFiles.poll();
            if(unopenedFile == null) {
                return false;
            }
            handle = new InputStreamHandler(unopenedFile, fileOpener.apply(unopenedFile));
        }
        for(int i=0; i < maxBuffer; ++i) {
            LINE line = lineReader.apply(handle.inputStream);
            if(line != null) {
                synchronized(sharedBuffer) {
                    sharedBuffer.add(new AwsS3LineInput<LINE>(handle.file, line));
                }
            }
            else {
                return tryAdvance(action);
            }
        }
        openedFiles.addFirst(handle);
        return tryAdvance(action);
    }

    @Override
    public Spliterator<AwsS3LineInput<LINE>> trySplit() {
        synchronized(incomingFiles) {
            if (incomingFiles.hasNext()) {
                unopenedFiles.add(incomingFiles.next());
                return new AwsS3LineSpliterator<LINE>(incomingFiles, fileOpener, lineReader, unopenedFiles, openedFiles, sharedBuffer, maxBuffer);
            } else {
                return null;
            }
        }
    }
}
常永怡
2023-03-14

您的trysplit应该输出大小相等的拆分,而不考虑基础文件的大小。您应该将所有文件视为单个单元,并每次使用相同数量的JSON对象填充支持arraylist的拆分器。对象的数量应该使处理一个拆分需要1到10毫秒:低于1毫秒,您就开始接近将批处理移交给工作线程的成本,高于这个值,您就开始冒着由于任务太粗粒度而导致CPU负载不均衡的风险。

拆分器没有义务报告大小估计,并且您已经正确地执行了这一操作:您的估计是long.max_value,这是一个特殊的值,表示“无界”。但是,如果您有一个JSON对象的多个文件,导致大小为1的批处理,这将从两个方面损害性能:打开-读取-关闭文件的开销可能成为瓶颈,如果您设法避免这种情况,线程切换的成本可能与处理一个项目的成本相比很大,再次造成瓶颈。

五年前我在解决一个类似的问题,你可以看看我的解决方案。

 类似资料:
  • 问题:修改一个BST,使它变得尽可能平衡。不用说,你应该尽可能有效率地做这件事。 提示:面试官说这是一个合乎逻辑的问题,如果你换位思考就会得到答案。没有困难的编码涉及。 -->话虽如此,但我不认为他希望我指向AVL/RB树。 我的解决方案是:我提出,我将对树进行顺序遍历,将中间元素作为新树的根(我们称之为新根)。然后到中间元素的左边,取它的中间元素作为树的左子树的根,生成新根。同样地,正确的部分也

  • 我们有一个相对简单的分片MongoDB设置:4个分片,每个分片是一个副本集,至少有3个成员。每个集合都由从大量文件加载的数据组成;每个文件都被赋予一个单调递增的ID,并且根据ID的哈希完成分片。 我们的大部分产品都在按预期工作。然而,我有一个集合似乎没有正确地将块分布到各个碎片上。在创建索引之前,集合加载了大约30GB的数据,并且进行了分片,但是据我所知,这并不重要。以下是该集合的统计数据: 这个

  • 假设我有一个包含1,000个元素和10个执行器的RDD。现在我用10个分区并行化RDD,并由每个执行器处理100个元素(假设每个执行器1个任务)。 我的困难是,其中一些分区任务可能比其他任务花费的时间要长得多,所以说8个执行器将很快完成,而剩下的2个执行器则将被困在执行时间更长的任务中。因此,主进程将等待2完成后再继续,8将处于空闲状态。 有什么方法可以让无所事事的执行者从忙碌的执行者那里“拿走”

  • 相信维基百科文章:http://en.wikipedia.org/wiki/AVL_tree AVL树高度平衡,但一般不平衡重量,也不平衡μ;[4] 也就是说,同级节点的子节点数量可能相差很大。 但是,作为AVL树是: 自平衡二叉查找树[...]。在AVL树中,任何节点的两个子树的高度最多相差一个 我不明白AVL怎么会是重量不平衡的,因为——如果我很好地理解AVL树的定义——每个兄弟姐妹都会有大约

  • 在Kubernetes中创建负载平衡器类型的服务时,它是创建一个全新的外部负载平衡器,还是只为负载平衡器类型的第一个服务创建一个负载平衡器,并将该负载平衡器重新用于负载平衡器类型的所有后续服务? 这个问题特别重要,因为为每个服务构建一个单独的负载平衡器对我来说成本太高。 如果它特定于云提供商,我使用Azure,但我很想知道其他云提供商是否不同。

  • 我正在解决“破解编码面试”中的以下问题:实现一个函数来检查二叉树是否平衡。平衡树是这样一种树:任何节点的两个子树的高度相差不会超过一个。 这本书的示例解决方案(复制如下)假设从节点发出的树是平衡的,如果(a)节点的左子树和右子树是平衡的;和(b)节点本身是平衡的。我在试图理解为什么会这样?以上两个条件的满足如何证明从节点发出的整个树是平衡的? 谢啦