当前位置: 首页 > 面试题库 >

Java,通过多线程环境中的哈希将传入的工作均匀地划分

闻鹤龄
2023-03-14
问题内容

我已经实现了一个Java代码,以Runnable根据其hashCode模块的n个线程来执行传入任务nThreads。理想情况下,工作应该均匀地分布在这些线程之间。具体来说,dispatchId每个任务都有一个字符串。

这是此Java代码段:

int nThreads = Runtime.getRuntime().availableProcessors(); // Number of threads
Worker[] workers = new Worker[nThreads]; // Those threads, Worker is just a thread class that can run incoming tasks
...
Worker getWorker(String dispatchId) { // Get a thread for this Task
    return workers[(dispatchId.hashCode() & Integer.MAX_VALUE) % nThreads];
}

重要说明: 在大多数情况下,dispatchId为:

String dispatchId = 'SomePrefix' + counter.next()

但是,我担心nThreads的模除法不是一个好选择,因为nThreads应该是质数,以便更均匀地分配dispatId键。

关于如何更好地传播作品还有其他选择吗?

更新1:

每个工人都有一个队列: Queue<RunnableWrapper> tasks = new ConcurrentLinkedQueue();

工作人员从中获取任务并执行任务。可以从其他线程将任务添加到此队列。

更新2:

具有相同任务的任务dispatchId可以多次出现,因此我们需要通过查找它们的线程dispatchId

最重要的是,每个Worker线程必须按顺序处理其传入的任务。因此,上面的更新1中存在数据结构Queue。

更新3:
此外,某些线程可能很忙,而其他线程则空闲。因此,我们需要以某种方式将队列与线程解耦,但要保持FIFO顺序不变,dispatchId以便执行任务。

解决方案: 我已经实现了Ben
Manes的想法(他的回答如下),代码可以在这里找到。


问题答案:

听起来您需要按每个调度ID进行FIFO排序,所以理想的方法是将调度队列作为抽象。这可以解释您对散列不提供统一分发的担忧,因为某些调度队列可能比其他调度队列更活跃,并且在工作人员之间不公平地平衡。通过将队列与工作程序分开,您可以保留FIFO语义并均匀地分散工作。

提供这种抽象的非活动库是HawtDispatch。它与Java
6兼容。

一个非常简单的Java
8方法是使用CompletableFuture作为排队机制,使用ConcurrentHashMap进行注册,并使用执行程序(例如ForkJoinPool)进行计算。有关此想法的实现,请参阅EventDispatcher,其中注册是显式的。如果您的调度员更具动态性,那么您可能需要定期修剪地图。基本思想如下。

ConcurrentMap<String, CompletableFuture<Void>> dispatchQueues = ...

public CompletableFuture<Void> dispatch(String queueName, Runnable task) {
  return dispatchQueues.compute(queueName, (k, queue) -> {
    return (queue == null)
        ? CompletableFuture.runAsync(task)
        : queue.thenRunAsync(task);
  });
}

更新(JDK7)

上述想法的后盾将通过番石榴翻译成类似以下内容:

ListeningExecutorService executor = ...
Striped<Lock> locks = Striped.lock(256);
ConcurrentMap<String, ListenableFuture<?>> dispatchQueues = ...

public ListenableFuture<?> dispatch(String queueName, final Runnable task) {
  Lock lock = locks.get(queueName);
  lock.lock();
  try {
    ListenableFuture<?> future = dispatchQueues.get(queueName);
    if (future == null) {
      future = executor.submit(task);
    } else {
      final SettableFuture<Void> next = SettableFuture.create();
      future.addListener(new Runnable() {
        try {
          task.run();
        } finally {
          next.set(null);
        }
      }, executor);
      future = next;
    }
    dispatchQueues.put(queueName, future);
  } finally {
    lock.unlock();
  }
}


 类似资料:
  • 问题内容: 典型的(对于x86-64平台和Linux OS)是在开始时幼稚地锁定互斥锁并在完成后将其释放,还是以更巧妙的方式将互斥锁锁定在更精细的级别,从而减少了锁争用?如果确实采用第二种方法,那么该如何做? 问题答案: 经营多个分配 场所 。每个竞技场都有自己的锁。当线程需要分配内存时,选择一个竞技场,将其锁定,然后从中分配内存。 选择竞技场的机制有些复杂,旨在减少锁争用: 考虑到这一点,基本上

  • 我在Java 7号工作。 我想知道方法在HashSet对象上是否是线程安全的。 散列集由一个线程初始化。然后我们用不可修改的集合()包装HashSet。初始化后,多个线程只调用方法。 当我阅读Javadoc时,它对我来说是不清楚的。 在HashSet Javadoc上,我们可以阅读 这个类实现Set接口,由一个哈希表(实际上是一个HashMap实例)支持。 ... 请注意,此实现不是同步的。 在H

  • 如果我有多个线程,每个线程使用injector获取EntityManager对象,每个线程使用em对象选择其他类对象的列表。准备好在for循环中使用。 如果一个线程首先完成并调用clear(),这会影响其他线程吗?比如for循环会有异常? 谢谢你。

  • 我正在进行一项预定的工作,该工作将以一定的间隔运行(例如每天下午1点),通过Cron安排。我正在使用Java和Spring。 编写计划作业非常简单 - 它确实如此:从db中抓取人员列表将某些条件,为每个人做一些计算并触发消息。 我正在本地和测试中开发单节点环境,但是当我们投入生产时,它将是多节点环境(带有负载均衡器等)。我关心的是多节点环境会如何影响计划的作业? 我的猜测是,我可能(或很可能)最终

  • 本文向大家介绍Java工作环境的配置与Eclipse的安装过程,包括了Java工作环境的配置与Eclipse的安装过程的使用技巧和注意事项,需要的朋友参考一下   Eclipse是一个开放源代码的、基于Java的可扩展开发平台。就其本身而言,它只是一个框架和一组服务,用于通过插件组件构建开发环境.Eclipse 附带了一个标准的插件集,包括Java开发工具(Java Development Kit

  • 我的 Web 应用程序中出现随机错误,我迷路了。我创建了一个库来解码代码。我尝试了很多,从未失败过测试。但突然间,它开始随机失败。由于它在单线程测试中运行良好,有时在 servlet 环境中失败时,我能想象的唯一解释是问题与多线程环境中使用的库有关。老实说,我知道多线程是一个非常复杂的问题。我担心我的库可能不是线程安全的。顺便说一下,它非常简单,它是一个具有几种静态方法的正面类。基本上,假设您正在