当前位置: 首页 > 知识库问答 >
问题:

一个线程池的多个CompletionService

庄新翰
2023-03-14

我正在开发一个Java服务器应用程序,其一般体系结构如下:

  • 客户端向服务器发出RPC请求
  • 我相信RPC服务器(gRPC)有自己的线程池来处理请求
  • 请求将立即插入线程池1以进行更多处理
  • 一种特定的请求类型,我们称之为request r,需要并行运行几个异步任务,判断结果以形成将返回给客户端的共识。这些任务的运行时间要长一些,所以我使用单独的线程池2来处理这些请求。重要的是,每个请求r将需要运行相同的2-3个异步任务。因此,线程池2服务于当前正在执行的所有请求R。但是,请求r应该只能看到和检索属于它的异步任务。
  • 为此,在每个传入的请求R时,当它位于线程池1中时,它将为该请求创建一个新的CompletionService,由线程池2支持。它将提交2-3个异步任务,并检索结果。这些请求应该与线程池2中可能运行的其他请求严格隔离。
  • 我的问题:
    • 首先,Java的CompletionService是孤立的吗?在检查了Javadocs之后,我找不到这方面的好文档。换句话说,如果两个或多个CompletionService由同一个线程池支持,那么它们中的任何一个是否有可能获得属于另一个CompletionService的未来?
    • 其次,为每个请求创建如此多的CompletionService是否是一种不好的做法?有没有更好的办法来处理这件事?当然,为每个请求创建一个新线程池不是一个好主意,那么是否有更规范/正确的方法在CompletionService中隔离期货呢?

    提前感谢你的帮助。任何有用的文档或检查的指针都将非常感谢。

    代码,供参考,尽管很琐碎:

    public static final ExecutorService THREAD_POOL_2 =
            new ThreadPoolExecutor(16, 64, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
    
    // Gets created to handle a RequestR, RequestRHandler is run in Thread Pool 1
    public class RequestRHandler {
    
        CompletionService<String> cs;
    
        RequestRHandler() {
            cs = new ExecutorCompletionService<>(THREAD_POOL_2);
        }
    
        String execute() {
            cs.submit(asyncTask1);
            cs.submit(asyncTask2);
            cs.submit(asyncTask3);
    
           // Lets say asyncTask3 completes first
          Future<String> asyncTask3Result = cs.take();
    
          // asyncTask3 result indicates asyncTask1 & asyncTask2 results don't matter, cancel them
          // without checking result
    
    
          // Cancels all futures, I track all futures submitted within this request and cancel them,
          // so it shouldn't affect any other requests in the TP 2 pool
          cancelAllFutures(cs);
    
    
          return asyncTask3Result.get();  
        }
    
    
    }
    
    

共有1个答案

解柏
2023-03-14

首先,Java的CompletionService是孤立的吗?

这不是garanteed,因为它是一个接口,所以实现决定了这一点。但是由于唯一的实现是ExecutorCompletionService,我只能说答案是:是的。ExecutorCompletionService的每个实例在内部都有一个BlockingQueue,完成的任务在其中排队。实际上,当您调用服务的take时,它只是通过调用队列的take将调用传递给队列。每个提交的任务都由另一个对象包装,当任务完成时,该对象将任务放入队列。因此,每个实例都独立于其他实例来管理它提交的任务。

其次,为每个请求创建如此多的CompletionService是否是一种不好的做法?

我会说没事的。CompletionService只不过是执行器的一个很薄的包装器。您必须忍受“开销”(任务的内部blockingqueue和包装器实例),但开销很小,而且您可能从中获得的开销远远超过成本。你可以问你是否只需要一个2到3个任务,但这有点取决于任务。在这一点上,这是一个关于CompletionService是否值得的问题,所以这取决于您的决定,因为它超出了您的问题范围。

 类似资料:
  • 我对连接池的理解是;如果connectionstring完全相同,那么我们重用该连接,而不是建立新的连接。 我的问题是,我正在为并行处理创建许多线程。在这个“虚拟”程序中,我创建了500个线程,并让线程池函数处理这些线程。 步骤是: > < li> 每个线程在SQL中创建一个更新表。(说明更新的时间戳) 然后线程Hibernate1到10秒(随机)。 最后,线程在 SQL 中进行另一次更新(说明结

  • 问题内容: 在Java中拥有多个线程池的优缺点是什么?我已经看过代码,其中有多个线程池用于不同的“类型”任务,而且我不确定它是更好的设计还是只是开发人员感到懒惰。一个示例是将ScheduledThreadPoolExecutor用于定期执行的任务或具有超时的任务,而将另一ThreadPoolExecutor用于其他任务。 问题答案: 具有单独的专用线程池的目的是,使活动不会因线程不足而被饥饿,因为

  • 问题 你创建一个工作者线程池,用来响应客户端请求或执行其他的工作。 解决方案 concurrent.futures 函数库有一个 ThreadPoolExecutor 类可以被用来完成这个任务。 下面是一个简单的TCP服务器,使用了一个线程池来响应客户端: from socket import AF_INET, SOCK_STREAM, socket from concurrent.futures

  • 面试问题 比如说,我们有一个在Employee表中有200万条记录的表,我们需要削减每个员工10%的工资(需要做一些处理),然后将其保存回collection。你怎样才能有效地做到这一点。 我问他,我们可以使用executor框架来创建多个线程,这些线程可以从表中获取值,然后我们可以处理并将其保存到列表中。 然后他问我,你将如何检查一个记录是否已经被处理,我不知道(如何做)。 甚至我也不确定我是否

  • 我有多个线程在我的中运行每个线程读取一个大文件并在List中返回该文件中的数据。 代码如下所示: 现在我知道以下代码段将出现在我的代码中的某个位置,但我不知道将其放置在哪里。因为如果我在for循环中的之后放置它,它就不会添加它,因为每个文件都非常大,可能还没有完成它的处理。 那么,有谁能告诉我,我应该把这段代码放在哪里,以及我需要确保哪些其他事情,以避免出现关键部分问题。 如果我只是在线程中并行读

  • 我编写了代码示例: 每100毫秒提交一个新任务(总任务量-20)。每个任务持续时间-0.5秒。因此,可以并行执行5个任务,最佳执行时间为:20*100 500=2.5秒,池应创建5个线程 但我的实验显示为9.6秒。我打开jsvisualvm查看池创建了多少线程,我看到只创建了一个线程: 请更正我的线程池配置不正确的地方。

  • 我需要创建一个并行执行多个操作的应用程序。我曾考虑过使用线程或线程池,但我以前从未使用过,所以我发现这相当困难。Thread应按以下方式工作: 所有系统应同时运行。你认为我应该如何实现这一点?

  • 问题内容: 线程都是可运行的,并且它们拥有相同的锁。两个线程都可以运行时,它们可以锁定相同的地址吗?那是JRE错误吗? 问题答案: 该问题仅存在于线程转储中。实际上,在任何时间点,锁都仅由一个线程持有。但是,线程转储显示两个具有相同锁的不同线程,因为它不是原子的。 可以使用以下程序轻松重现该行为: