当前位置: 首页 > 知识库问答 >
问题:

自定义线程池执行器

蒙胤
2023-03-14

我正在编写一个定制的ThreadPoolExecutor,具有以下额外功能:-

>

如果有理想的线程,并且随着任务的到来,将该任务分配到队列中,而不是将其添加到队列中。

如果所有线程(最大池大小)都忙,则在新任务到来时,使用RejectionHandler的reject方法将它们添加到队列中

我已经重写了线程池执行程序的java 1.5版本的执行方法。

新守则如下:-

 public void execute(Runnable command) {
        System.out.println(" Active Count: "+getActiveCount()+" PoolSize: "+getPoolSize()+" Idle Count: "+(getPoolSize()-getActiveCount())+" Queue Size: "+getQueue().size()); 
         if (command == null)
             throw new NullPointerException();
         for (;;) {
             if (runState != RUNNING) {
                 reject(command);
                 return;
             }
             if (poolSize < corePoolSize && addIfUnderCorePoolSize(command)) {
                 return;
             }
            if (runState == RUNNING && (getPoolSize()-getActiveCount() != 0) && workQueue.offer(command)) {
                 return;
             }
             int status = addIfUnderMaximumPoolSize(command);
             if (status > 0)      // created new thread
                 return;
             if (status == 0) {   // failed to create thread
                 reject(command);
                 return;
             }

             if (workQueue.offer(command))
                 return;
             // Retry if created a new thread but it is busy with another task
         }
     }

遗留代码如下所示:-

 public void execute(Runnable command) {
          if (command == null)
              throw new NullPointerException();
          for (;;) {
             if (runState != RUNNING) {
                  reject(command);
                  return;
              }
              if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
                  return;
              if (workQueue.offer(command))
                  return;
              int status = addIfUnderMaximumPoolSize(command);
              if (status > 0)      // created new thread
                  return;
             if (status == 0) {   // failed to create thread
                 reject(command);
                  return;
             }
            // Retry if created a new thread but it is busy with another task
          }
      }

现在正在生成的问题是,当线程空闲时,它不会创建新的线程,但它甚至不会将任务分配给这些线程,否则它会将它们添加到队列中,这是不希望的,因为我们不想让任务等待,而是处理它ASAP,即使它需要新线程创建,但不允许等待任务。

请帮我解决这个问题。谢谢

共有2个答案

虞裕
2023-03-14

就我对您所描述的三个功能的了解而言,我认为使用ExecutorService将比您目前尝试的做得更多:一个Executor,它提供了管理终止的方法,以及可以产生未来跟踪一个或多个异步任务进度的方法,特别是:

1.catched thread pool:允许创建在Parralel中执行任务所需的任意多个线程。旧的可用线程将被重新用于新任务和固定线程池。

2.固定线程池:提供具有固定线程数的池。如果某个线程不可用于该任务,则该任务将被放入队列中,等待其他任务结束。

看看这篇文章的详细解释和很好的例子。

姚钊
2023-03-14

如果我理解了这个问题,我相信我已经找到了ThreadPoolExecutor默认行为的解决方案,我在这里的回答中显示了:

如何让线程池执行器在排队前将线程增加到最大?

基本上,你LinkedBlockingQueue总是返回false的queue.offer(...),这将在必要时向池中添加一个额外的线程。如果池已经处于最大线程,并且它们都很忙,则将调用Reject tedExektionHandler。然后是处理程序将(...)放入队列。

看到我的代码在那里。

 类似资料:
  • SOFARPC 支持自定义业务线程池。可以为指定服务设置一个独立的业务线程池,和 SOFARPC 自身的业务线程池是隔离的。多个服务可以共用一个独立的线程池。 SOFARPC 要求自定义线程池的类型必须是 com.alipay.sofa.rpc.server.UserThreadPool。 XML 方式 如果采用 XML 的方式发布服务,可以先设定一个 class 为 com.alipay.sof

  • 由来 在JDK中,提供了Executors用于创建自定义的线程池对象ExecutorService,但是考虑到线程池中存在众多概念,这些概念通过不同的搭配实现灵活的线程管理策略,单独使用Executors无法满足需求,构建了ExecutorBuilder。 概念 corePoolSize 初始池大小 maxPoolSize 最大池大小(允许同时执行的最大线程数) workQueue 队列,用于存在

  • 我正在使用线程池执行器更改遗留设计。详情如下:- 遗留:-对于遗留设计,在应用程序启动时创建600个线程。和放置在各种池中,然后在需要时提取这些池,并将任务分配给相应的线程。 新:-在新设计中,我将线程池替换为执行器服务 我观察到的是,对于Executor,在启动时不会创建线程。它们是在从客户端激发请求时创建的。因此,与前一个线程相比,在内存中创建的线程要少得多。 但我的问题是,这样做是否正确,因

  • 是否可以为Java8并行流指定自定义线程池?我到处都找不到它。 如果我不能为不同的模块使用不同的线程池,这就意味着我不能在大多数真实世界的情况下安全地使用并行流。 请尝试以下示例。有些CPU密集型任务在单独的线程中执行。这些任务利用并行流。第一个任务被打破,因此每一步需要1秒(通过线程Hibernate模拟)。问题是其他线程会被卡住,等待中断的任务完成。这是一个虚构的示例,但假设一个servlet

  • 根据ThreadPoolExecutor文档(Java ThreadPoolExecutor),如果我像这样创建一个executor服务: 当#线程

  • 问题内容: 是否可以为Java 8 并行流指定自定义线程池?我在任何地方都找不到。 假设我有一个服务器应用程序,并且想使用并行流。但是应用程序很大并且是多线程的,所以我想将其划分。我不希望一个模块中的一个模块中的某个模块运行缓慢,而另一个模块中的任务却运行缓慢。 如果不能为不同的模块使用不同的线程池,则意味着在大多数实际情况下,我不能安全地使用并行流。 请尝试以下示例。在单独的线程中执行一些CPU