当前位置: 首页 > 知识库问答 >
问题:

使用ThreadPoolExecutor的同步任务生产者/消费者

翟俊名
2023-03-14

我试图找到一种在以下场景中使用ThreadPoolExecutor的方法:

  • 我有一个单独的线程在线程池中生成和提交任务

为了提供更多的上下文,我目前只需一次提交所有任务,并取消ExecutorService返回的所有未来。在最长生成时间到期后提交。我忽略所有产生的取消异常,因为它们是预期的。问题是未来的行为。cancel(false)很奇怪,不适合我的用例:

  • 它可以防止任何未启动的任务运行(良好)

我研究了Java必须提供的不同阻塞队列,发现了这个:https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/SynchronousQueue.html.起初这似乎很理想,但后来看看https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html,它似乎并没有按照我想要的方式使用ThreadPoolExecator

直接切换。对于工作队列来说,一个很好的默认选择是SynchronousQueue,它将任务移交给线程,而无需持有它们。在这里,如果没有线程可以立即运行任务,则尝试将任务排队将失败,因此将构造一个新线程。此策略避免在处理可能具有内部依赖关系的请求集时发生锁定。直接切换通常需要无限的最大工具大小,以避免拒绝新提交的任务。这反过来又允许了当命令继续以平均快于其处理速度的速度到达时出现无限线程增长的可能性。

理想的情况是消费者(=池)阻塞同步队列(SynchronousQueue)。轮询,生产者(=任务生产者线程)在同步队列上阻塞。放置。

知道如何在不编写任何复杂调度逻辑的情况下实现我所描述的场景吗(应该为我附上什么线程池执行器)?

共有2个答案

冯通
2023-03-14

我发现了另一个选项,而不是@Carlitos Way提出的选项。它包括使用BlockingQueue.offer直接在队列中添加任务。我一开始没能让它工作并且我不得不发布这个问题的唯一原因是我不知道ThreadPoolExecator的默认行为是在没有任何线程的情况下启动。线程将使用线程工厂延迟创建,并且可能会根据池的核心和最大大小以及同时提交的任务数量来删除/重新填充。

由于线程创建很慢,我尝试阻止调用提供失败,因为SynchronousQueue。如果没有人等待从队列中获取元素,则offer将立即退出。相反,同步队列(SynchronousQueue)。在有人请求从队列中提取项目之前,放置(put)会一直阻塞,如果线程池为空,则永远不会发生这种情况。

因此,解决方法是强制线程池急切地使用ThreadPoolExecutor.prestartAllCoreThread创建核心线程。我的问题变得相当微不足道。我制作了一个真实用例的简化版本:

import java.util.Random;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

public class SimplifiedBuildScheduler {
    private static final int MAX_POOL_SIZE = 10;

    private static final Random random = new Random();
    private static final AtomicLong nextTaskId = new AtomicLong(0);

    public static void main(String[] args) throws InterruptedException {
        SynchronousQueue<Runnable> queue = new SynchronousQueue<>();

        // this is a soft requirement in my system, not a real-time guarantee. See the complete semantics in my question.
        long maxBuildTimeInMillis = 50;
        // this timeout must be small compared to maxBuildTimeInMillis in order to accurately match the maximum build time
        long taskSubmissionTimeoutInMillis = 1;

        ThreadPoolExecutor pool = new ThreadPoolExecutor(MAX_POOL_SIZE, MAX_POOL_SIZE, 0, SECONDS, queue);
        pool.prestartAllCoreThreads();

        Runnable nextTask = makeTask(maxBuildTimeInMillis);

        long millisAtStart = System.currentTimeMillis();
        while (maxBuildTimeInMillis > System.currentTimeMillis() - millisAtStart) {
            boolean submitted = queue.offer(nextTask, taskSubmissionTimeoutInMillis, MILLISECONDS);
            if (submitted) {
                nextTask = makeTask(maxBuildTimeInMillis);
            } else {
                System.out.println("Task " + nextTaskId.get() + " was not submitted. " + "It will be rescheduled unless " +
                        "the max build time has expired");
            }
        }

        System.out.println("Max build time has expired. Stop submitting new tasks and running existing tasks to completion");

        pool.shutdown();
        pool.awaitTermination(9999999, SECONDS);
    }

    private static Runnable makeTask(long maxBuildTimeInMillis) {
        long sleepTimeInMillis = randomSleepTime(maxBuildTimeInMillis);
        long taskId = nextTaskId.getAndIncrement();
        return () -> {
            try {
                System.out.println("Task " + taskId + " sleeping for " + sleepTimeInMillis + " ms");
                Thread.sleep(sleepTimeInMillis);
                System.out.println("Task " + taskId + " completed !");
            } catch (InterruptedException ex) {
                throw new RuntimeException(ex);
            }
        };
    }

    private static int randomSleepTime(long maxBuildTimeInMillis) {
        // voluntarily make it possible that a task finishes after the max build time is expired
        return 1 + random.nextInt(2 * Math.toIntExact(maxBuildTimeInMillis));
    }
}

输出示例如下:

Task 1 was not submitted. It will be rescheduled unless the max build time has expired
Task 0 sleeping for 23 ms
Task 1 sleeping for 26 ms
Task 2 sleeping for 6 ms
Task 3 sleeping for 9 ms
Task 4 sleeping for 75 ms
Task 5 sleeping for 35 ms
Task 6 sleeping for 81 ms
Task 8 was not submitted. It will be rescheduled unless the max build time has expired
Task 8 was not submitted. It will be rescheduled unless the max build time has expired
Task 7 sleeping for 86 ms
Task 8 sleeping for 47 ms
Task 9 sleeping for 40 ms
Task 11 was not submitted. It will be rescheduled unless the max build time has expired
Task 2 completed !
Task 10 sleeping for 76 ms
Task 12 was not submitted. It will be rescheduled unless the max build time has expired
Task 3 completed !
Task 11 sleeping for 31 ms
Task 13 was not submitted. It will be rescheduled unless the max build time has expired
Task 13 was not submitted. It will be rescheduled unless the max build time has expired
Task 13 was not submitted. It will be rescheduled unless the max build time has expired
Task 13 was not submitted. It will be rescheduled unless the max build time has expired
Task 13 was not submitted. It will be rescheduled unless the max build time has expired
Task 13 was not submitted. It will be rescheduled unless the max build time has expired
Task 0 completed !
Task 12 sleeping for 7 ms
Task 14 was not submitted. It will be rescheduled unless the max build time has expired
Task 14 was not submitted. It will be rescheduled unless the max build time has expired
Task 1 completed !
Task 13 sleeping for 40 ms
Task 15 was not submitted. It will be rescheduled unless the max build time has expired
Task 12 completed !
Task 14 sleeping for 93 ms
Task 16 was not submitted. It will be rescheduled unless the max build time has expired
Task 16 was not submitted. It will be rescheduled unless the max build time has expired
Task 16 was not submitted. It will be rescheduled unless the max build time has expired
Task 5 completed !
Task 15 sleeping for 20 ms
Task 17 was not submitted. It will be rescheduled unless the max build time has expired
Task 17 was not submitted. It will be rescheduled unless the max build time has expired
Task 11 completed !
Task 16 sleeping for 27 ms
Task 18 was not submitted. It will be rescheduled unless the max build time has expired
Task 18 was not submitted. It will be rescheduled unless the max build time has expired
Task 9 completed !
Task 17 sleeping for 95 ms
Task 19 was not submitted. It will be rescheduled unless the max build time has expired
Max build time has expired. Stop submitting new tasks and running existing tasks to completion
Task 8 completed !
Task 15 completed !
Task 13 completed !
Task 16 completed !
Task 4 completed !
Task 6 completed !
Task 10 completed !
Task 7 completed !
Task 14 completed !
Task 17 completed !

例如,您会注意到,任务19没有被重新调度,因为在调度程序可以尝试第二次将其提供给队列之前,最大生成时间已经过期。您还可以看到,在最大生成时间到期之前启动的所有正在进行的任务都会运行到完成。

注意:正如我在代码中的注释所述,最大构建时间是一个软性要求,这意味着可能无法完全满足它,而且我的解决方案确实允许在最大构建时间到期后提交任务。如果对offer的调用刚好在最大构建时间到期之前开始,并在之后完成,则可能会发生这种情况。为了降低发生这种情况的可能性,重要的是调用offer时使用的超时要远远小于最大构建时间。在实际系统中,线程池通常很忙,没有空闲线程,因此发生这种争用情况的概率非常小,并且在发生这种情况时对系统没有不良后果,因为最大构建时间是为了满足总体运行时间的最大努力,而不是严格的限制。

宗政英才
2023-03-14

我相信你在正确的道路上。。。您所要做的就是使用以下构造函数将同步队列与拒绝执行处理程序(RejectedExecutionHandler)结合使用。。。通过这种方式,您可以定义一个固定的最大大小线程池(限制您的资源使用),并定义一个回退机制来重新安排那些无法处理的任务(因为池已满)。。。例子:

public class Experiment {

    public static final long HANDLER_SLEEP_TIME = 4000;
    public static final int MAX_POOL_SIZE = 1;

    public static void main(String[] args) throws InterruptedException {
        SynchronousQueue<Runnable> queue;
        RejectedExecutionHandler handler;
        ThreadPoolExecutor pool;
        Runnable runA, runB;

        queue   = new SynchronousQueue<>();
        handler = new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                try {
                    System.out.println("Handler invoked! Thread: " + Thread.currentThread().getName());
                    Thread.sleep(HANDLER_SLEEP_TIME); // this let runnableA finish
                    executor.submit(r);    // re schedule

                } catch (InterruptedException ex) {
                    throw new RuntimeException("Handler Exception!", ex);
                }
            }
        };

        pool = new ThreadPoolExecutor(1, MAX_POOL_SIZE, 10, TimeUnit.SECONDS, queue, handler);
        runA = new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(3000);
                    System.out.println("hello, I'm runnable A");

                } catch (Exception ex) {
                    throw new RuntimeException("RunnableA", ex);
                }
            }
        };
        runB = new Runnable() {
            @Override
            public void run() {
                System.out.println("hello, I'm runnable B");
            }
        };

        pool.submit(runA);
        pool.submit(runB);
        pool.shutdown();
    }
}

注意:Reject tedExecutionHandler的实现取决于您!我只是建议将睡眠作为阻塞机制,但您可以执行更复杂的逻辑,例如询问线程池是否有空闲线程。如果不是,则Hibernate;如果是,则再次提交任务......

 类似资料:
  • 我有一个消费者作为生产者消费者模式的一部分: 简化: 如果我移除 通过将线程设置为睡眠,CPU使用率攀升到极高的水平(13%),而不是0%。 此外,如果我实例化该类的多个实例,则每个实例的CPU使用率都会以13%的增量攀升。 大约每分钟(可能每30秒)都会向BlockingCollection添加一个新的LogItem,并将适用的消息写入文件。 有没有可能线程以某种方式阻止了其他线程的运行,而系统

  • 我有两个线程的问题,似乎没有正确同步。我基本上有一个布尔值名为“已占用”。当没有线程启动时,它被设置为false。但是当一个线程启动时,线程集被占用是真的,我有一个类,它有线程(run),它们调用下面的函数。 这是一个模拟银行的示例,它接收一个金额(初始余额),然后随机执行取款和存款。我的教授提到了一些关于从取款线程到存款线程的信号?这是怎么回事?在提取线程中,它应该运行到余额为2低,并等待存款线

  • 我有一个生产者/消费者场景,我不希望一个生产者交付产品,多个消费者消费这些产品。然而,常见的情况是,交付的产品只被一个消费者消费,而其他消费者从未看到过这个特定的产品。我不想实现的是,一个产品被每个消费者消费一次,而没有任何形式的阻碍。 我的第一个想法是使用多个BlockingQueue,每个消费者使用一个,并使生产者将每个产品按顺序放入所有可用的BlockingQueues中。但是,如果其中一个

  • 本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要

  • 生产者线程与消费者线程使用信号量同步 生产者线程与消费者线程使用信号量同步 源码/* * Copyright (c) 2006-2018, RT-Thread Development Team * * SPDX-License-Identifier: Apache-2.0 * * Change Logs: * Date Author Notes * 2018-08-24 yangjie the f

  • 所谓的生产者消费者模型就是 某个模块(函数)负责生产数据,这些数据由另一个模块来负责处理 一般生产者消费者模型包含三个部分 生产者、缓冲区、消费者 为什么生产者消费者模型要含三个部分?直接生产和消费不行么? 一个案例说明一切 生产者好比现实生活中的某个人 缓冲区好比现实生活中的邮箱 消费者好比现实生活中的邮递员 如果只有生产者和消费者, 那么相当于只有写信的人和邮递员,那么如果将来过去的邮递员离职