当前位置: 首页 > 知识库问答 >
问题:

如何使线程等待变量达到特定值(多线程Java)

郭志泽
2023-03-14

我有一个接受客户端连接的服务器程序。这些客户端连接可以属于多个流。例如,两个或多个客户机可以属于同一个流。在这些流中,我必须传递一条消息,但我必须等到所有流都建立起来。为此,我维护以下数据结构。

ConcurrentHashMap<Integer, AtomicLong> conhasmap = new ConcurrentHashMap<Integer, AtomicLong>();

整数是流 ID,长整型是客户端编号。为了使给定流的一个线程等待原子龙达到特定值,我使用了以下循环。实际上,流的第一个数据包会将其放入流 ID 和要等待的连接数。对于每个连接,我都会减少要等待的连接。

while(conhasmap.get(conectionID) != new AtomicLong(0)){
       // Do nothing
}

然而,这个循环阻塞了其他线程。根据这个答案,它会进行易失性读取。我如何修改代码以等待给定流的正确线程,直到它达到特定值?

共有2个答案

潘翰藻
2023-03-14

您的表情:

conhasmap.get(conectionID) != new AtomicLong(0)

将始终为真,因为您比较的是永远不会相等的对象引用,而不是值。更好的表达式是:

conhasmap.get(conectionID).longValue() != 0L)

,但是这样的循环在循环中没有等待/通知逻辑不是一个好的实践,因为它经常使用CPU时间。相反,每个线程都应该在AtomicLong实例上调用.wait(),当它递减或递增时,您应该在AtomecLong实例中调用.notifyAll()来唤醒每个等待的线程来检查表达式。AtomicLong类可能已经在每次修改时调用notifyAll()方法,但我不确定。

AtomicLong al = conhasmap.get(conectionID);
synchronized(al) {
    while(al.longValue() != 0L) {
        al.wait(100); //wait up to 100 millis to be notified
    }
}

在递增/递减的代码中,它将如下所示:

AtomicLong al = conhasmap.get(conectionID);
synchronized(al) {
    if(al.decrementAndGet() == 0L) {
        al.notifyAll();
    }
}

我个人不会为这个计数器使用AtomicLong,因为您没有从AtomicLeng的无锁行为中受益。只需使用java.lang.Long,因为无论如何都需要在计数器对象上同步wait()/notify()逻辑。

墨高杰
2023-03-14

如果您使用的是 Java 8,那么“可完成的未来”可能很合适。下面是一个完整的、人为的示例,它正在等待 5 个客户端连接并将消息发送到服务器(使用带有优惠/轮询的阻止队列进行模拟)。

在此示例中,当达到预期的客户端连接消息计数时,将完成一个CompletableFuture挂钩,然后在您选择的任何线程上运行任意代码。

在这个示例中,您没有任何复杂的线程等待/通知设置或繁忙等待循环。

package so.thread.state;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

public class Main {

  public static String CONNETED_MSG = "CONNETED";
  public static Long EXPECTED_CONN_COUNT = 5L;

  public static ExecutorService executor = Executors.newCachedThreadPool();
  public static BlockingQueue<String> queue = new LinkedBlockingQueue<>();

  public static AtomicBoolean done = new AtomicBoolean(false);

  public static void main(String[] args) throws Exception {

    // add a "server" thread
    executor.submit(() -> server());

    // add 5 "client" threads
    for (int i = 0; i < EXPECTED_CONN_COUNT; i++) {
      executor.submit(() -> client());
    }

    // clean shut down
    Thread.sleep(TimeUnit.SECONDS.toMillis(1));
    done.set(true);
    Thread.sleep(TimeUnit.SECONDS.toMillis(1));
    executor.shutdown();
    executor.awaitTermination(1, TimeUnit.SECONDS);

  }

  public static void server() {

    print("Server started up");
    // track # of client connections established
    AtomicLong connectionCount = new AtomicLong(0L);

    // at startup, create my "hook"
    CompletableFuture<Long> hook = new CompletableFuture<>();
    hook.thenAcceptAsync(Main::allClientsConnected, executor);

    // consume messages
    while (!done.get()) {
      try {
        String msg = queue.poll(5, TimeUnit.MILLISECONDS);
        if (null != msg) {
          print("Server received client message");
          if (CONNETED_MSG.equals(msg)) {
            long count = connectionCount.incrementAndGet();

            if (count >= EXPECTED_CONN_COUNT) {
              hook.complete(count);
            }
          }
        }

      } catch (Exception e) {
        e.printStackTrace();
      }
    }

    print("Server shut down");

  }

  public static void client() {
    queue.offer(CONNETED_MSG);
    print("Client sent message");
  }

  public static void allClientsConnected(Long count) {
    print("All clients connected, count: " + count);
  }


  public static void print(String msg) {
    System.out.println(String.format("[%s] %s", Thread.currentThread().getName(), msg));
  }
}

输出如下

[pool-1-thread-1] Server started up
[pool-1-thread-5] Client sent message
[pool-1-thread-3] Client sent message
[pool-1-thread-2] Client sent message
[pool-1-thread-6] Client sent message
[pool-1-thread-4] Client sent message
[pool-1-thread-1] Server received client message
[pool-1-thread-1] Server received client message
[pool-1-thread-1] Server received client message
[pool-1-thread-1] Server received client message
[pool-1-thread-1] Server received client message
[pool-1-thread-4] All clients connected, count: 5
[pool-1-thread-1] Server shut down
 类似资料:
  • 问题内容: 我有以下情况: 为了运行算法,我必须运行多个线程,并且每个线程都会在死之前设置一个实例变量x。问题是这些线程不会立即返回: 我应该使用等待通知吗?还是我应该嵌入一个while循环并检查是否终止? 感谢大家! 问题答案: 创建一些共享存储来保存每个线程的值,或者如果足够的话,只存储总和。使用a 等待线程终止。每个线程完成后都会调用,您的方法将使用该方法来等待它们。 编辑: 这是我建议的方

  • 问题内容: 我正在为我的ubuntu服务器(针对我的多客户端匿名聊天程序)实现一种简单的线程池机制,并且需要使我的工作线程进入睡眠状态,直到需要执行一项工作(以函数指针和参数的形式) 。 我当前的系统即将关闭。我(工人线程正在)问经理是否有工作可用,以及是否有5毫秒没有睡眠。如果存在,请将作业添加到工作队列中并运行该函数。糟糕的循环浪费。 什么我 喜欢 做的是做一个简单的事件性的系统。我正在考虑有

  • 问题内容: 我找不到如何测量线程等待锁定的时间。我必须确定一个线程是否正在等待锁定超过1秒,如果需要,则运行另一个线程。谢谢! 问题答案: 试试这个:

  • 问题内容: 有什么方法可以简单地等待所有线程处理完成?例如,假设我有: 如何更改此方法,以便该方法在注释处暂停直到所有线程的方法退出?谢谢! 问题答案: 你将所有线程放入数组中,全部启动,然后进行循环 每个连接将阻塞,直到相应的线程完成为止。线程的完成顺序可能不同于你加入线程的顺序,但这不是问题:退出循环时,所有线程均已完成。

  • 这可能是在类似的背景下问的,但我在搜索了大约20分钟后找不到答案,所以我会问。 我已经编写了一个Python脚本(比如说:scriptA.py)和一个脚本(比如说scriptB.py) 在scriptB中,我想用不同的参数多次调用scriptA,每次运行大约需要一个小时,(这是一个巨大的脚本,做了很多事情……不用担心),我希望能够同时使用所有不同的参数运行scriptA,但我需要等到所有参数都完成

  • 问题内容: 我正在用一个应用程序逻辑线程和一个数据库访问线程来制作Java应用程序。他们都坚持为应用程序和都需要的整个生命周期,以在同一时间运行(一个会谈到服务器,一个谈判给用户;当应用程序完全启动,我需要两个人工作)。 但是,在启动时,我需要确保最初应用线程等待直到数据库线程准备就绪(当前是通过轮询自定义方法确定的)。我不介意应用线程在数据库线程准备就绪之前是否阻塞。 看起来不是解决方案-db线