我想了解在fork-连接池中处理任务Java顺序。
到目前为止,我在文档中找到的唯一相关信息是关于一个名为“asyncMode”的参数,该参数“如果此池对从未加入的分叉任务使用本地先进先出调度模式,则为真”。
我对这句话的解释是,每个工人都有自己的任务队列;工人从他们自己队列的前面接受任务,或者如果他们自己的队列是空的,从其他工人队列的后面偷走任务;如果asyncMode为真(分别为假),工作人员将新分叉的任务添加到自己队列的后面(分别为前面)。
如果我的解释有误,请指正!
现在,这引发了几个问题:
1) 加入的分叉任务的排序是什么?
我的猜测是,当一个任务被分叉时,它被添加到工作者的队列中,正如我在上面的解释中所描述的。现在,假设任务被加入...
>
如果在调用join时,任务尚未启动,则调用join的工作人员将从队列中拉出该任务,并立即开始处理该任务。
如果在调用join时,任务已经被另一个工作者窃取,那么调用join的工作者将同时处理其他任务(按照我在上面的解释中描述的获取任务的顺序),直到它正在加入的任务已经被窃取它的工作者完成。
这个猜测是基于使用print语句编写简单的测试代码,并观察更改连接调用的顺序如何影响任务的处理顺序。有人能告诉我我的猜测是否正确吗?
2)外部提交的任务的顺序是什么?
根据这个问题的答案,fork-join池不使用外部队列。(顺便说一下,我使用的是Java 8。)
那么我是否可以理解当一个任务被外部提交时,该任务被添加到一个随机选择的工作队列中?
如果是,外部提交的任务是添加到队列的后面还是前面?
最后,这是否取决于任务是通过调用 pool.execute(task) 还是通过调用 pool.invoke(task) 提交?这是否取决于调用 pool.execute(task) 或 pool.invoke(task) 的线程是外部线程还是此分叉连接池中的线程?
* Joining Tasks
* =============
*
* Any of several actions may be taken when one worker is waiting
* to join a task stolen (or always held) by another. Because we
* are multiplexing many tasks on to a pool of workers, we can't
* just let them block (as in Thread.join). We also cannot just
* reassign the joiner's run-time stack with another and replace
* it later, which would be a form of "continuation", that even if
* possible is not necessarily a good idea since we may need both
* an unblocked task and its continuation to progress. Instead we
* combine two tactics:
*
* Helping: Arranging for the joiner to execute some task that it
* would be running if the steal had not occurred.
*
* Compensating: Unless there are already enough live threads,
* method tryCompensate() may create or re-activate a spare
* thread to compensate for blocked joiners until they unblock.
2.ForkJoinPool.invoke和ForkJoinPool.join在任务提交方式上完全相同。你可以在代码中看到
public <T> T invoke(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
return task.join();
}
public void execute(ForkJoinTask<?> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
}
在exteralPush中,您可以看到该任务使用ThreadLocalRandom添加到随机选择的工作队列中。此外,它使用推送方法进入队列的头部。
final void externalPush(ForkJoinTask<?> task) {
WorkQueue[] ws; WorkQueue q; int m;
int r = ThreadLocalRandom.getProbe();
int rs = runState;
if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask<?>[] a; int am, n, s;
if ((a = q.array) != null &&
(am = a.length - 1) > (n = (s = q.top) - q.base)) {
int j = ((am & s) << ASHIFT) + ABASE;
U.putOrderedObject(a, j, task);
U.putOrderedInt(q, QTOP, s + 1);
U.putIntVolatile(q, QLOCK, 0);
if (n <= 1)
signalWork(ws, q);
return;
}
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
externalSubmit(task);
}
我不知道你这么说是什么意思:
这是否取决于线程调用池。执行(任务)或汇集。invoke(task)是外部线程还是此fork-join池中的线程?
顺序队列(Sequential Queue) 1. 顺序队列的概念 1.1 顺序队列的定义 顺序队列是基于一维数组的存储表示实现的队列。 1.2 顺序队列中各元素的逻辑及存储关系 顺序队列可以采用顺序表作为其存储表示,因此,可以在顺序队列的声明中用顺序表定义它的存储空间。 顺序队列可以使用一维数组作为队列的存储空间,存放队列元素的数组的头指针为*elements,该数组的最大允许存放元素个数为ma
所以我使用executorservice创建了一个线程池。 我试图访问线程池队列中的任务数。我看到没有办法得到它。我知道有一些方法来获取线程池执行器中的队列大小,但是我如何使用执行器服务对象来实现这一点。 就像我说的,如果我创建了一个像这样的线程池执行器,我可以得到队列信息 我知道我可以使用tpExecutor。队列size()获取线程池队列中的任务数。但目前我已经使用Executor服务声明了我
问题内容: 我编写了一个迷宫求解程序,该程序应该支持DFS,BFS,A *,Dijkstra和贪婪算法。无论如何,我选择了PriorityQueue作为我的边界数据结构,因为我认为优先级的行为就像队列,堆栈或优先级队列一样,取决于比较器的实现。 这是我实现比较器以将优先级队列转换为队列的方式: / 由于优先级队列的“自然排序”元素最少,并且常规比较器在第一个小于第二个时返回-1,因此被黑的比较器始
编辑:Redis+Sidekiq完成该工作。在这里,Redis作为一个消息队列工作,Sidekiq在后台处理这些消息。我很想知道,选择一个显式代理(如RabbitMQ、SQS、Redis PubSub)而不是Redis+SideKiQ有什么用例和好处?
问题内容: 我正在学习 Java Concurrency in Practice, 并陷入了 8.3.1线程创建和拆除的 主题。以下脚注警告要保持为零。 有时,开发人员倾向于将核心大小设置为零,以使工作线程最终被拆除,因此不会阻止JVM退出,但这会在不使用a的线程池中引起一些奇怪的现象。他们的工作队列使用SynchronousQueue(就像newCachedThreadPool一样)。 如果池已
主要内容:顺序队列简单实现,顺序队列另一种实现方法顺序队列 ,即采用顺序表模拟实现的队列结构。 我们知道,队列具有以下两个特点: 数据从队列的一端进,另一端出; 数据的入队和出队遵循"先进先出"的原则; 因此,只要使用顺序表按以上两个要求操作数据,即可实现顺序队列。首先来学习一种最简单的实现方法。 顺序队列简单实现 由于顺序队列的底层使用的是数组,因此需预先申请一块足够大的内存空间初始化顺序队列。除此之外,为了满足顺序队列中数据从队尾进,队头出且