当前位置: 首页 > 知识库问答 >
问题:

什么是并行流队列行为?

秦安怡
2023-03-14

我正在使用parallelStream并行上传一些文件,有些是大文件,有些是小文件。我注意到并不是所有的工人都被使用。

一开始一切都运行良好,所有线程都被使用(我将并行选项设置为16)。然后在某一点上(一旦它到达更大的文件),它只使用一个线程

简化代码:

files.parallelStream().forEach((file) -> {
    try (FileInputStream fileInputStream = new FileInputStream(file)) {
                IDocumentStorageAdaptor uploader = null;

                try {
                    logger.debug("Adaptors before taking: " + uploaderPool.size());
                    uploader = uploaderPool.take();
                    logger.debug("Took an adaptor!");
                    logger.debug("Adaptors after taking: " + uploaderPool.size());
                    uploader.addNewFile(file);
                } finally {
                    if (uploader != null) {
                        logger.debug("Adding one back!");
                        uploaderPool.put(uploader);
                        logger.debug("Adaptors after putting: " + uploaderPool.size());
                    }
                }
            } catch (InterruptedException | IOException e) {
                throw new UploadException(e);
            }
});

uploaderPool是一个ArrayBlockingQueue。日志:

[ForkJoinPool.commonPool-worker-8] - Adaptors before taking: 0
[ForkJoinPool.commonPool-worker-15] - Adding one back!
[ForkJoinPool.commonPool-worker-8] - Took an adaptor!
[ForkJoinPool.commonPool-worker-15] - Adaptors after putting: 0
...
...
...
[ForkJoinPool.commonPool-worker-10] - Adding one back!
[ForkJoinPool.commonPool-worker-10] - Adaptors after putting: 16
[ForkJoinPool.commonPool-worker-10] - Adaptors before taking: 16
[ForkJoinPool.commonPool-worker-10] - Took an adaptor!
[ForkJoinPool.commonPool-worker-10] - Adaptors after taking: 15
[ForkJoinPool.commonPool-worker-10] - Adding one back!
[ForkJoinPool.commonPool-worker-10] - Adaptors after putting: 16
[ForkJoinPool.commonPool-worker-10] - Adaptors before taking: 16
[ForkJoinPool.commonPool-worker-10] - Took an adaptor!
[ForkJoinPool.commonPool-worker-10] - Adaptors after taking: 15

似乎所有的工作(列表中的项目)都分布在16个线程中,委托给一个线程的事情只会等待线程自由工作,而不是使用可用的线程。有没有办法改变parallelStream在排队时的工作方式?我读过forkjoinpool文档,其中提到了工作窃取,但只针对衍生的子任务。

我的另一个计划可能是随机排列我正在使用parallelStream的列表,这可能会平衡一些事情。

谢谢!

共有1个答案

蓝苗宣
2023-03-14

并行流的分割与计算启发式是针对数据并行操作进行调整的,而不是针对IO并行操作。(换句话说,它们是为了让CPU保持忙碌而调整的,但不会生成比你拥有的CPU多得多的任务。)因此,它们偏向于计算而不是分叉。目前没有覆盖这些选择的选项。

 类似资料:
  • 队列,和 栈一样,也是一种对数据的"存"和"取"有严格要求的 线性存储结构。 与栈结构不同的是, 队列的两端都"开口",要求数据只能从一端进,从另一端出,如图 1 所示: 图 1 队列存储结构 通常,称进数据的一端为 "队尾",出数据的一端为 "队头",数据元素进队列的过程称为 "入队",出队列的过程称为 "出队"。 不仅如此, 队列中数据的进出要遵循 "先进先出" 的原则,即最先进队列的数据元素

  • 问题内容: 我在玩无限的流,并制作了该程序进行基准测试。基本上,您提供的数字越大,完成的速度就越快。但是,令我惊讶的是,与顺序流相比,使用并行流导致的性能成倍下降。凭直觉,人们期望在多线程环境中生成和评估无限快的随机数流,但是事实并非如此。为什么是这样? 问题答案: 我完全同意其他评论和答案,但实际上,如果目标很低,您的测试会表现得很奇怪。在我的普通笔记本电脑上,当给出非常低的目标时,并行版本平均

  • 队列是项的有序结合,其中添加新项的一端称为队尾,移除项的一端称为队首。当一个元素从队尾进入队列时,一直向队首移动,直到它成为下一个需要移除的元素为止。 最近添加的元素必须在队尾等待。集合中存活时间最长的元素在队首,这种排序成为 FIFO,先进先出,也被成为先到先得。 队列的最简单的例子是我们平时不时会参与的列。排队等待电影,在杂货店的收营台等待,在自助餐厅排队等待(这样我们可以弹出托盘栈)。行为良

  • 为了简单起见,我将column称为col。为什么矩阵是[行,列]而不是[列,行]?这给我带来了很多头痛和困惑。 我的思路是这样的:1.一个正则数组, 就像一个矩阵,有一行和多列。它的符号是这样的:啊,如果我们有另一个维度, 现在有行了。因此,让我们在'n',arr[n,rows]之后记下这些行,但现实告诉我们,情况并非如此。 对不起,如果我混淆了你,对不起我的无知。

  • 考虑到我有2个CPU核心的事实,并行版本不是应该更快吗?有人能给我一个提示为什么并行版本比较慢吗?

  • 我们正在构建一个具有Node.js服务器和Express的架构体系。 在服务器中,发生的情况如下所示: 服务器接受来自客户端的传入HTTP请求 服务器生成两个文件(此操作可能“相对较长”,也就是0.1秒左右) 服务器将生成的文件(每个文件约20-200 KB)上载到外部CDN 服务器响应客户机,这包括CDN上文件的URI 目前,服务器正在为每个请求按顺序执行此操作,并且效果非常好(Node/Exp