当前位置: 首页 > 知识库问答 >
问题:

ExecutorService固定线程拒绝

乌靖
2023-03-14

我有一些线程的问题。

我的剧本

1-从文本文件将1000多万行加载到数组中

2-创建5个固定线程的执行池

3-然后它正在迭代该列表并将一些线程添加到队列中

executor.submit(new MyCustomThread(line,threadTimeout,"[THREAD "+Integer.toString(increment)+"]"));

现在活动线程永远不会绕过5个固定线程,这很好,但我发现我的处理器进入100%负载,我已经调试了一点,我看到正在调用MyCustomThread构造函数,女巫意味着无论我声明5个固定线程,ExecutorService仍将尝试创建10万个对象。

主要问题是:我如何防止这种情况?我只是想让线程在没有空间的情况下被拒绝,而不是创建1000万对象并一个接一个地运行它们。

第二个问题:我如何得到当前的活动线程?我尝试了threadGroup.activeCount(),但它总是给我5 5 5......

调用方类:

System.out.println("Starting threads ...");
final ThreadGroup threadGroup = new ThreadGroup("workers");
//ExecutorService executor = Executors.newFixedThreadPool(howManyThreads);

ExecutorService executor = Executors.newFixedThreadPool(5,new ThreadFactory() {
    public Thread newThread(Runnable r) {
        return new Thread(threadGroup, r);
    }
});

int increment = 0;              
for(String line : arrayOfLines)
{
    if(increment > 10000)
    {
        //System.out.println("TOO MANY!!");
        //System.exit(0);
    }

    System.out.println(line);
    System.out.println(threadGroup.activeCount());

    if(threadGroup.activeCount() >= 5)
    {
        for(int i = 0; i < 10; i++)
        {
            System.out.println(threadGroup.activeCount());
            System.out.println(threadGroup.activeGroupCount());
            Thread.sleep(1000);
        }
    }


    try
    {
        executor.submit(new MyCustomThread(line,threadTimeout,"[THREAD "+Integer.toString(increment)+"]"));
    }
    catch(Exception ex)
    {
        continue;
        //System.exit(0);
    }

    increment++;
}

executor.awaitTermination(10, TimeUnit.MILLISECONDS);
executor.shutdown();

Thread等级:

public class MyCustomThread extends Thread
{
    private String ip;
    private String threadName;
    private int threadTimeout = 10;

    public MyCustomThread(String ip)
    {
        this.ip = ip;
    }

    public MyCustomThread(String ip,int threadTimeout,String threadName)
    {

        this.ip = ip;
        this.threadTimeout = threadTimeout;
        this.threadName = threadName;

        System.out.prinln("MyCustomThread constructor has been called!");
    }

    @Override
    public void run()
    {
        // do some stuff that takes time ....
    }
}

谢谢你。

共有2个答案

令狐献
2023-03-14

我认为这里最大的问题是MyCustomThread应该实现Runnable,而不是扩展Thread。使用ExecutorService时,您可以让它处理线程管理(即不需要创建它们)

这是我认为你想要做的一个近似值。希望这有帮助。

public class FileProcessor
{

    public static void main(String[] args)
    {

        List<String> lines = readFile();
        System.out.println("Starting threads ...");
        ExecutorService executor = Executors.newFixedThreadPool(5);

        for(String line : lines)
        {
            try
            {
                executor.submit(new MyCustomThread(line));
            }
            catch(Exception ex)
            {
                ex.printStackTrace();
            }
        }

        try
        {
            executor.shutdown();
            executor.awaitTermination(10, TimeUnit.SECONDS);
        }
        catch (InterruptedException e)
        {
            System.out.println("A processor took longer than the await time to complete.");
        }
        executor.shutdownNow();

    }

    protected static List<String> readFile()
    {
        List<String> lines = new ArrayList<String>();
        try
        {
            String filename = "/temp/data.dat";
            FileReader fileReader = new FileReader(filename );
            BufferedReader bufferedReader = new BufferedReader(fileReader);
            String line = null;
            while ((line = bufferedReader.readLine()) != null) {
                lines.add(line);
            }
            bufferedReader.close();
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
        return lines;
    }
}

public class MyCustomThread implements Runnable
{

    String line;

    MyCustomThread(String line)
    {
        this.line = line;
    }

    @Override
    public void run()
    {
        System.out.println(Thread.currentThread().getName() + " processed line:" + line);

    }

}

编辑:此实现不会阻止ExecutorService提交。我的意思是,无论之前提交的MyCustomThreads是否已完成,都会为文件中的每一行创建一个新的MyCustomThread实例。您可以添加一个阻塞/限制工作队列来防止出现这种情况。

ExecutorService executor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LimitedQueue<Runnable>(10));

可以在此处找到阻塞/限制队列实现的示例:

苏宾鸿
2023-03-14

你做错了一点。执行器的原理是将工作单元实现为可运行或可调用(而不是线程)。每个Runnable或Callable都应该做一个原子工作,这与其他Runnable或Callable是互斥的。

Executor服务在内部使用线程池,因此创建线程组和线程没有任何好处。

试试这个简单的片段:

ExecutorService executor = Executors.newFixedThreadPool(5);`
executor.execute(new MyRunnableWorker());

public class MyRunnableWorker implements Runnable{
    private String ip;
    private String threadName;
    private int threadTimeout = 10;

    public MyRunnableWorker(String ip){
        this.ip = ip;
    }

    public MyRunnableWorker(String ip,int threadTimeout,String threadName){
        this.ip = ip;
        this.threadTimeout = threadTimeout;
        this.threadName = threadName;

        System.out.prinln("MyRunnableWorker constructor has been called!");
    }

    @Override
    public void run(){    {
        // do some stuff that takes time ....
    }
}

这将为您提供您想要的东西。还要尝试使用Visual alVM测试线程代码执行情况,以查看线程如何运行以及负载分布情况。

 类似资料:
  • 假设我有一个Executors静态工厂方法的ExecutorService实例。 如果我从某个线程提交了一个调用,其中RetVal不是线程安全的本地实例化对象,那么当我从同一个线程获得()它时,我需要担心retvals的完整性吗?人们说局部变量是线程安全的,但我不确定当您返回一个本地实例化的对象并从其他线程接收它时,它是否适用。 下面是我的定制实现,我只是为了测试。您可以忽略EType枚举。

  • 问题内容: 有没有一种方法可以使用ExecutorService暂停/恢复特定线程? 想象一下,我想停止id == 0的线程(假设为每个线程分配了一个增量ID,直到达到线程池的大小为止)。 过了一会儿,通过按下一个按钮,我想恢复该特定线程,并将所有其他线程保留为当前状态,这些状态可以暂停或恢复。 我在Java文档中发现了PausableThreadPoolExecutor的未完成版本。但这不适合我

  • 我从主线程调用了下面的代码,使用ExecutorService池并启动一个线程来处理找到的每个文件。我正在尝试了解当主线程被kill命令终止时ExecutorService的行为。生成的线程会发生什么?一旦完成工作,它们会立即被杀还是终止? 还有没有更好/更安全的方法来编写下面的代码段,特别是如果我在无限循环中运行这部分,例如等待文件被放到输入目录并分配线程来处理它们?在这种情况下,我应该创建一个

  • 问题内容: 假设我有一个利用该框架的应用程序 当我在调试器中运行此应用程序时,将使用以下(默认)名称创建一个线程:。如你所见,这并不是非常有用,而且据我所知,该框架没有提供一种简便的方法来命名已创建的线程或线程池。 那么,如何为线程/线程池提供名称呢?例如,。 问题答案: 你可以提供一个到。工厂将负责创建线程,并将能够为其命名。 引用Javadoc: 创建新线程 使用创建新线程。如果没有另外指定,

  • 问题内容: 输出量 自333线程产生于444线程以来,我一直期待输出的最后一行出现“ NewInitialValue”,而tl是本地可继承线程。 是什么导致此问题,如何解决? 问题答案: 当您无法控制线程的创建时,您不应依赖。Javadoc指出: […]创建子线程时,子级将接收父级具有值的所有可继承线程局部变量的初始值。 在您的示例中,线程是由返回者创建的 那是一个执行程序,最多将使用两个线程来执

  • 问题内容: 使用return by时,如何中断它? 问题答案: 为此,您需要将任务分配给,而不是调用。当您执行此操作时,将返回一个可用于操纵计划任务的a。特别是,您可以调用关联程序来中断当前正在执行的任务(或者,如果该任务尚未开始运行,则完全跳过执行)。 顺便说一句,由返回的对象实际上是个。