当前位置: 首页 > 知识库问答 >
问题:

Java ForkJoinPool - 队列中任务的顺序

康鹏云
2023-03-14

我想了解在fork-连接池中处理任务Java顺序。

到目前为止,我在文档中找到的唯一相关信息是关于一个名为“asyncMode”的参数,该参数“如果此池对从未加入的分叉任务使用本地先进先出调度模式,则为真”。

我对这句话的解释是,每个工人都有自己的任务队列;工人从他们自己队列的前面接受任务,或者如果他们自己的队列是空的,从其他工人队列的后面偷走任务;如果asyncMode为真(分别为假),工作人员将新分叉的任务添加到自己队列的后面(分别为前面)。

如果我的解释有误,请指正!

现在,这引发了几个问题:

1) 加入的分叉任务的排序是什么?

我的猜测是,当一个任务被分叉时,它被添加到工作者的队列中,正如我在上面的解释中所描述的。现在,假设任务被加入...

>

  • 如果在调用join时,任务尚未启动,则调用join的工作人员将从队列中拉出该任务,并立即开始处理该任务。

    如果在调用join时,任务已经被另一个工作者窃取,那么调用join的工作者将同时处理其他任务(按照我在上面的解释中描述的获取任务的顺序),直到它正在加入的任务已经被窃取它的工作者完成。

    这个猜测是基于使用print语句编写简单的测试代码,并观察更改连接调用的顺序如何影响任务的处理顺序。有人能告诉我我的猜测是否正确吗?

    2)外部提交的任务的顺序是什么?

    根据这个问题的答案,fork-join池不使用外部队列。(顺便说一下,我使用的是Java 8。)

    那么我是否可以理解当一个任务被外部提交时,该任务被添加到一个随机选择的工作队列中?

    如果是,外部提交的任务是添加到队列的后面还是前面?

    最后,这是否取决于任务是通过调用 pool.execute(task) 还是通过调用 pool.invoke(task) 提交?这是否取决于调用 pool.execute(task) 或 pool.invoke(task) 的线程是外部线程还是此分叉连接池中的线程?

  • 共有1个答案

    蓝恩
    2023-03-14
    1. 你的猜测是正确的,你完全正确。正如您在“实施概述”中所看到的
     * Joining Tasks
     * =============
     *
     * Any of several actions may be taken when one worker is waiting
     * to join a task stolen (or always held) by another.  Because we
     * are multiplexing many tasks on to a pool of workers, we can't
     * just let them block (as in Thread.join).  We also cannot just
     * reassign the joiner's run-time stack with another and replace
     * it later, which would be a form of "continuation", that even if
     * possible is not necessarily a good idea since we may need both
     * an unblocked task and its continuation to progress.  Instead we
     * combine two tactics:
     *
     *   Helping: Arranging for the joiner to execute some task that it
     *      would be running if the steal had not occurred.
     *
     *   Compensating: Unless there are already enough live threads,
     *      method tryCompensate() may create or re-activate a spare
     *      thread to compensate for blocked joiners until they unblock.
    

    2.ForkJoinPool.invoke和ForkJoinPool.join在任务提交方式上完全相同。你可以在代码中看到

        public <T> T invoke(ForkJoinTask<T> task) {
            if (task == null)
                throw new NullPointerException();
            externalPush(task);
            return task.join();
        }
        public void execute(ForkJoinTask<?> task) {
            if (task == null)
                throw new NullPointerException();
            externalPush(task);
        }
    

    在exteralPush中,您可以看到该任务使用ThreadLocalRandom添加到随机选择的工作队列中。此外,它使用推送方法进入队列的头部。

        final void externalPush(ForkJoinTask<?> task) {
            WorkQueue[] ws; WorkQueue q; int m;
            int r = ThreadLocalRandom.getProbe();
            int rs = runState;
            if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
                (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
                U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                ForkJoinTask<?>[] a; int am, n, s;
                if ((a = q.array) != null &&
                    (am = a.length - 1) > (n = (s = q.top) - q.base)) {
                        int j = ((am & s) << ASHIFT) + ABASE;
                    U.putOrderedObject(a, j, task);
                    U.putOrderedInt(q, QTOP, s + 1);
                    U.putIntVolatile(q, QLOCK, 0);
                    if (n <= 1)
                        signalWork(ws, q);
                    return;
                }
                U.compareAndSwapInt(q, QLOCK, 1, 0);
            }
            externalSubmit(task);
        }
    

    我不知道你这么说是什么意思:

    这是否取决于线程调用池。执行(任务)或汇集。invoke(task)是外部线程还是此fork-join池中的线程?

     类似资料:
    • 顺序队列(Sequential Queue) 1. 顺序队列的概念 1.1 顺序队列的定义 顺序队列是基于一维数组的存储表示实现的队列。 1.2 顺序队列中各元素的逻辑及存储关系 顺序队列可以采用顺序表作为其存储表示,因此,可以在顺序队列的声明中用顺序表定义它的存储空间。 顺序队列可以使用一维数组作为队列的存储空间,存放队列元素的数组的头指针为*elements,该数组的最大允许存放元素个数为ma

    • 所以我使用executorservice创建了一个线程池。 我试图访问线程池队列中的任务数。我看到没有办法得到它。我知道有一些方法来获取线程池执行器中的队列大小,但是我如何使用执行器服务对象来实现这一点。 就像我说的,如果我创建了一个像这样的线程池执行器,我可以得到队列信息 我知道我可以使用tpExecutor。队列size()获取线程池队列中的任务数。但目前我已经使用Executor服务声明了我

    • 问题内容: 我编写了一个迷宫求解程序,该程序应该支持DFS,BFS,A *,Dijkstra和贪婪算法。无论如何,我选择了PriorityQueue作为我的边界数据结构,因为我认为优先级的行为就像队列,堆栈或优先级队列一样,取决于比较器的实现。 这是我实现比较器以将优先级队列转换为队列的方式: / 由于优先级队列的“自然排序”元素最少,并且常规比较器在第一个小于第二个时返回-1,因此被黑的比较器始

    • 编辑:Redis+Sidekiq完成该工作。在这里,Redis作为一个消息队列工作,Sidekiq在后台处理这些消息。我很想知道,选择一个显式代理(如RabbitMQ、SQS、Redis PubSub)而不是Redis+SideKiQ有什么用例和好处?

    • 问题内容: 我正在学习 Java Concurrency in Practice, 并陷入了 8.3.1线程创建和拆除的 主题。以下脚注警告要保持为零。 有时,开发人员倾向于将核心大小设置为零,以使工作线程最终被拆除,因此不会阻止JVM退出,但这会在不使用a的线程池中引起一些奇怪的现象。他们的工作队列使用SynchronousQueue(就像newCachedThreadPool一样)。 如果池已

    • 主要内容:顺序队列简单实现,顺序队列另一种实现方法顺序队列 ,即采用顺序表模拟实现的队列结构。 我们知道,队列具有以下两个特点: 数据从队列的一端进,另一端出; 数据的入队和出队遵循"先进先出"的原则; 因此,只要使用顺序表按以上两个要求操作数据,即可实现顺序队列。首先来学习一种最简单的实现方法。 顺序队列简单实现 由于顺序队列的底层使用的是数组,因此需预先申请一块足够大的内存空间初始化顺序队列。除此之外,为了满足顺序队列中数据从队尾进,队头出且