当前位置: 首页 > 知识库问答 >
问题:

没有工作线程时fork-connect有工作要做?

曾光誉
2023-03-14

我有一个错误,现在在生产中出现了两次,其中一个分叉/连接池停止工作,尽管它有工作要做并且正在添加更多工作。

这是我到目前为止得出的结论,以解释为什么要执行的任务队列被填满并且任务结果流停止。我有线程转储,其中我的任务生产者线程正在等待fork/连接提交完成,但是没有ForkJoinpool工作线程对此做任何事情。

"calc-scheduling-pool-4-thread-2" #65 prio=5 os_prio=0  tid=0x00000000102e39f0 nid=0x794a in Object.wait() [0x00002ad900a06000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    at java.util.concurrent.ForkJoinTask.externalAwaitDone(ForkJoinTask.java:334)
    - locked <0x000000061ad08708> (a com.....Engine$Calculation)
    at java.util.concurrent.ForkJoinTask.doJoin(ForkJoinTask.java:391)
    at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:719)
    at java.util.concurrent.ForkJoinPool.invoke(ForkJoinPool.java:2613)
    at com...Engine.calculateSinceLastBatch(Engine.java:141)

不管我在做什么,这都不应该发生,对吗?线程转储来自检测到初始条件后的许多小时。我还有另外两个运行时中的ForkJoinPool,它们都正常运行,存在许多工作线程。

这个池的并行度是1(我知道这很愚蠢,但不应该破坏fork/连接池的正确性)。在我的任务队列填满并且线程转储显示没有工作器之前,没有检测到其他错误或异常。

有人看到这个了吗?要么我错过了什么,要么是fork/连接中有一个从未(重新)为我启动工作线程的错误。

运行时是java 8

用代码更新

这是对我们在生产中如何使用fork/join的合理简化。我们有三个引擎,其中只有一个配置了并行度1。

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.*;

public class Engine {

    BlockingQueue<Calculation> externalQueue = new LinkedBlockingQueue<>(100000);
    ScheduledExecutorService scheduling = Executors.newScheduledThreadPool(3);
    static ForkJoinPool forkJoin = new ForkJoinPool(1);

    public static void main(String[] args) {
        new Engine().start();
    }

    void start() {
        final AtomicInteger batch = new AtomicInteger(0);
        // data comes in from external systems
        scheduling.scheduleWithFixedDelay(
                () -> produceData(batch.getAndIncrement()),
                500,
                500,
                TimeUnit.MILLISECONDS);
        // internal scheduling processes data with a fixed delay
        scheduling.scheduleWithFixedDelay(
                this::calculate,
                1000,
                1000,
                TimeUnit.MILLISECONDS);
    }

    void produceData(final int batch) {
        System.out.println(Thread.currentThread().getName() + " => submitting data for batch " + batch);
        Stream<Integer> data = IntStream.range(0, 10).boxed();
        data.map((i) -> new Calculation(batch, i)).forEach(externalQueue::offer);
    }

    void calculate() {
        int available = externalQueue.size();
        List<Calculation> tasks = new ArrayList<>(available);
        externalQueue.drainTo(tasks);
        // invoke will block for the results to be calculated before continuing
        forkJoin.invoke(new CalculationTask(tasks, 0, tasks.size()));
        System.out.println("done with calculations at " + new Date());
    }

    static class CalculationTask extends RecursiveAction {

        static int MIN_CALCULATION_THRESHOLD = 3;

        List<Calculation> tasks;
        int start;
        int end;

        CalculationTask(List<Calculation> tasks, int start, int end) {
            this.tasks = tasks;
            this.start = start;
            this.end = end;
        }

        // if below a threshold, calculate here, else fork to new CalculationTasks
        @Override
        protected void compute() {
            int work = end - start;
            if (work <= threshold()) {
                for (int i = start; i < end; i++) {
                    Calculation calc = tasks.get(i);
                    calc.calculate();
                }
                return;
            }

            invokeNewActions();
        }

        int threshold() {
            return Math.max(tasks.size() / forkJoin.getParallelism() / 2, MIN_CALCULATION_THRESHOLD);
        }

        void invokeNewActions() {
            invokeAll(
                    new CalculationTask(tasks, start, middle()),
                    new CalculationTask(tasks, middle(), end));
        }

        int middle() {
            return (start + end) / 2;
        }
    }

    static class Calculation {

        int batch;
        int data;

        Calculation(int batch, int data) {
            this.batch = batch;
            this.data = data;
        }

        void calculate() {
            // does some work and pushes results to a listener
            System.out.println(Thread.currentThread().getName() + " => calculation complete on batch " + batch
                            + " for " + data);
        }
    }

}

共有1个答案

鲜于念
2023-03-14

等待java.util.concurrent.ForkJoinTask.externalAwaitDone(ForkJoinTask.java:334)

这告诉我,F/J 可能正在使用您的提交线程作为工作线程。按照调用All中的代码进行操作。在任务提交执行后,代码需要 Future,它最终得到 ((ForkJoinTask)futures.get(i)).quietlyJoin();悄悄地加入去做Join。

在那里,如果(Thread.currentThread())instanceof ForkJoinWorkerThread)不是真的,如果池使用您的提交线程作为工作线程,它最终会出现在exteralAwaitDone()中。

问题可能是您的提交线程永远不会醒来,因为它不是真正的工作线程。使用提交线程作为工作线程存在许多问题,这可能是另一个问题。

正如@John-Vint所说,如果没有测试,这个答案只是一个猜测。为什么不设置并行度

 类似资料:
  • 主线程通常被用于运行主循环,而主循环负责的都是 UI 相关的工作,所以也可以说主线程是 UI 线程。为了不影响 UI 线程的工作效率,我们会需要创建额外的线程来负责各种各样的工作,而这些线程就是工作线程。 在主循环的章节中,我们已经了解到主循环执行频率影响界面的流畅度,它的每一次循环都会按顺序执行处理定时器、处理事件队列、更新组件、渲染组件等任务,其中最容易影响到主循环的执行频率的任务是处理事件队

  • 我已经在Ubuntu 16.04(VM实例)中安装并配置了kibana。当我运行bin文件时,kibana没有启动。日志文件有如下错误消息。"type":"log","@time戳":"2017-09-08T11:21:53Z","tags":["status","plugin:kibana@5.2.2","info"],"pid": 7300,"state":"green","消息":"stat

  • 问题内容: 在http://marcio.io/2015/07/handling-1-million-requests-per-minute-with- golang/ 提供的示例中,很多地方都引用了该示例。 分派服务完许多工作后,工人池(chan chan工作)会不会耗尽?因为从信道和信道工作拉出第一类型后没有被补充被调用的第一次?还是我想念/误读了什么?如何为WorkerPool补充可用的工作

  • 基于文档https://netty . io/4.1/API/io/netty/channel/channel pipeline . html:< br > //告诉管道在与I/O线程不同的线程中运行MyBusinessLogicHandler的事件处理程序方法 //,这样I/O线程就不会被 //阻塞,这是一项耗时的任务。 //如果您的业务逻辑是完全异步的或很快就完成了,您不需要指定一个组。 pi

  • 这是我的设备 这是我的消息正文作者 我的问题是: 为什么结果似乎不正确?我被放在@JsonRootName(“Facility”)上,并且还启用了包装根特性 我错过了什么