当前位置: 首页 > 知识库问答 >
问题:

Java可调用线程:保持配置

赵嘉悦
2023-03-14

我正在设置一个服务器(Radius)的模拟器(用于测试),它使用线程将查询发送到另一个服务器(LDAP)。查询需要以每秒x的速度执行。为此,我使用了一个带有callable的调度线程池执行器,这样我就可以创建callable并将它们提交给线程池执行。每个线程都应该打开自己的连接并使用它进行查询。问题是,每次使用连接时,我都希望相同的线程重复使用它。

澄清:

如果我有一个20个线程池,我希望创建和使用20个连接。(因此我可以发送10.000个查询,由20个线程/连接依次处理)。

现在,要连接的(LDAP)服务器信息被发送到可调用的构造函数,可调用设置连接以执行。此后,我使用可调用的未来系统检索结果。问题是每次我创建一个可调用的连接都被打开(当然后来会关闭)。

我正在寻找最佳实践,以保持连接活着,并为每个线程重新使用它们。

我想了一些方法来实现这一点,但它们似乎不是很有效:

  • 使用我的线程池中的连接池在需要时检索空闲连接(造成死锁和其他线程安全问题)

实现这一点的最有效方法是什么?

编辑:我在想,因为我无法安全地获得线程编号,但线程ID总是唯一的,我可以使用

map<String/threadId, connection>

并将整个映射(引用)传递给可调用对象。这样我就可以使用:(伪代码)

Connection con = map.get(this.getThreadId());
If (con == null){
  con = new Connection(...);
  map.put(this.getThreadId(), con)
}

也可以使地图保持静态,并仅以静态方式访问它。这样我就不必将映射传递给可调用对象。这至少是安全的,不会强迫我重新构造代码。

新问题:什么更符合最佳实践;上述解决方案还是齐姆-扎姆的解决方案?如果以上是最好的,那么静态解决方案会更好吗?

共有1个答案

邓韬
2023-03-14

我将使用BlockingQueue来实现这一点,它在Callable之间共享,并且每隔一秒钟将x查询放入BlockingQueue

public class Worker implements Runnable {
    private final BlockingQueue<Query> inbox;
    private final BlockingQueue<Result> outbox;

    public Worker(BlockingQueue<Query> inbox, BlockingQueue<Result> outbox) {
        // create LDAP connection
        this.inbox = inbox;
        this.outbox = outbox;
    }

    public void run() {
        try {
            while(true) {
                // waits for a Query to be available
                Query query = inbox.take();
                // execute query
                outbox.add(new Result(/* result */));
            }
        } catch(InterruptedException e) {
          // log and restart? close LDAP connection and return?
        }
    }
}

public class Master {
   private final int x; // number of queries per second
   private final BlockingQueue<Query> outbox = new ArrayBlockingQueue<>(4 * x);
   private final BlockingQueue<Result> inbox = new ArrayBlockingQueue<>(4 * x);
   private final ScheduledThreadPoolExecutor executor;
   private final List<Future<?>> workers = new ArrayList<>(20);
   private final Future<?> receiver;

   public Master() {
     // initialize executor
     for(int i = 0; i < 20; i++) {
         Worker worker = new Worker(inbox, outbox);
         workers.add(executor.submit(worker));
     }

     receiver = executor.submit(new Runnable() {
         public void run() {
           while(!Thread.interrupted()) {
             try {
               Result result = inbox.take();
               // process result
             } catch(InterruptedException e) {
               return;
             }
           }
         }
     }
   }

   executor.scheduleWithFixedDelay(new Runnable() {
       public void run() {
           // add x queries to the queue
       }
   }, 0, 1, TimeUnit.SECONDS);
}

使用BlockingQueue#add发件箱添加新的查询,如果这引发异常,那么您的队列已满,您需要降低查询创建速率和/或创建更多工作人员。要在其未来上中断工作进程的无限循环调用cancel(true),这将在工作进程内部抛出一个中断异常

 类似资料:
  • 在另一个主题中,和等Java解决方案中的thread.currentThread().join()的用户被认为是一些评论者可以接受的方法,但有一个评论者说,应该使用一些有用的方法来完成,这些方法可以使线程保持活动,而不是仅仅使它Hibernate。 我理解他的观点,但是如果没有什么事情可以使线程保持活动状态,就像这个例子,我订阅了一个“Topic like”以异步获取消息呢? 使用Thread.C

  • 问题内容: Java VM可以支持多少个线程?这会因供应商而异吗?通过操作系统?其他因素? 问题答案: 这取决于您正在使用的CPU,操作系统,其他正在执行的操作,您正在使用的Java版本以及其他因素。我已经看到Windows服务器在关闭计算机之前具有> 6500个线程。当然,大多数线程没有做任何事情。一旦计算机遇到了大约6500个线程(使用Java),整个计算机就会开始出现问题并变得不稳定。 我的

  • 主要内容:1 Java 线程调度程序,2 抢占式调度与时间片调度的区别1 Java 线程调度程序 Java中的线程调度程序是JVM(Java虚拟机)的一部分,它决定应该运行哪个线程。 我们无法保证线程调度程序将会选择哪个线程来运行。 一次只能在一个进程中运行一个线程。线程调度程序主要使用抢占式或时间片调度来调度线程。 2 抢占式调度与时间片调度的区别 在抢占式调度下,最高优先级的任务会一直执行,直到进入等待状态或死机状态或存在更高优先级的任务为止。 在时间分片调度下

  • 我有一个MainClass,一个Worker类和一个Supervisor类。在MainClass中,我创建了10个Worker类和一个Supervisor类,它们在不同的线程中运行。 . . 我不知道如何实现这个,因为每个线程中的条件是相互独立的,所以我不需要同步,所以我不能使用等待通知。

  • 问题内容: 我最近了解到,只需添加批注,就可以轻松地使任何会话bean方法异步。 例如 我知道Java EE 7添加了Concurrency Utilities ,但是在Java EE 6中,方法的线程池配置在哪里?有没有办法设置超时时间?是固定线程池吗?一个缓存的?优先级是什么?是否可以在容器中的某个位置进行配置? 问题答案: 我认为可以通过从@Timeout注释的方法调用Future.canc

  • 问题内容: 当前,我们正在分析tomcat线程转储。在tomcat上同时运行的所有线程的单线程转储包含以下行: 特别是我们不明白 根据我们的理解,它说三个线程当时正在锁定同一监视器。根据我们的理解,根据JLS,这是不可能的。 我们对线程转储的解释正确吗? 问题答案: 看起来所有这些线程都在等待与监视器关联的条件,即它们称为该监视器的方法。 当线程在其拥有的监视器上调用时,它会临时释放监视器,并在从