当前位置: 首页 > 知识库问答 >
问题:

Java在两个并发映射上的发生前一致线程视图

巢皓君
2023-03-14

我有一个java类来处理多线程订阅服务。通过实现可订阅接口,任务可以提交给服务并定期执行。代码草图如下所示:

import java.util.concurrent.*;

public class Subscribtions {

    private ConcurrentMap<Subscribable, Future<?>> futures = new ConcurrentHashMap<Subscribable, Future<?>>();
    private ConcurrentMap<Subscribable, Integer> cacheFutures = new ConcurrentHashMap<Subscribable, Integer>();
    private ScheduledExecutorService threads;

    public Subscribtions() {
        threads = Executors.newScheduledThreadPool(16);
    }

    public void subscribe(Subscribable subscription) {
        Runnable runnable = getThread(subscription);
        Future<?> future = threads.scheduleAtFixedRate(runnable, subscription.getInitialDelay(), subscription.getPeriod(), TimeUnit.SECONDS);
        futures.put(subscription, future);
    }

    /*
     * Only called from controller thread
     */
    public void unsubscribe(Subscribable subscription) {
        Future<?> future = futures.remove(subscription);    //1. Might be removed by worker thread 
        if (future != null)
            future.cancel(false);
        else {
            //3. Worker-thread view     := cacheFutures.put() -> futures.remove()
            //4. Controller-thread has seen futures.remove(), but has it seen cacheFutures.put()?
        }
    }

    /*
     * Only called from worker threads
     */
    private void delay(Runnable runnable, Subscribable subscription, long delay) {
        cacheFutures.put(subscription, 0);                  //2. Which is why it is cached first
        Future<?> currentFuture = futures.remove(subscription);
        if (currentFuture != null) {
            currentFuture.cancel(false);
            Future<?> future = threads.scheduleAtFixedRate(runnable, delay, subscription.getPeriod(), TimeUnit.SECONDS);
            futures.put(subscription, future);
        }
    }

    private Runnable getThread(Subscribable subscription) {
        return new Runnable() {
            public void run() {
                //Do work...
                boolean someCondition = true;
                long someDelay = 100;
                if (someCondition) {
                    delay(this, subscription, someDelay);
                }
            }
        };
    }

    public interface Subscribable {
        long getInitialDelay();
        long getPeriod();
    }
}

因此,该类允许:

  • 订阅新任务
  • 取消订阅现有任务
  • 延迟定期执行的任务

订阅由外部控制线程添加/删除,但延迟仅由内部工作线程引起。例如,如果工作线程从上次执行中没有发现更新,或者例如,如果线程只需要从00.00 - 23.00执行,则可能发生这种情况。

我的问题是工作线程可能调用< code>delay()并从ConcurrentMap中删除它的future,而控制器线程可能同时调用< code>unsubscribe()。然后,如果控制器线程在工作线程放入新的future之前检查ConcurrentMap,则< code>unsubscribe()调用将会丢失。

有一些(可能不是详尽的)解决方案:

  • delay()unsubscribe()方法之间使用锁。
  • 同上,但每个订阅一个锁。
  • (首选?)不使用锁,但在delay()方法中“缓存”已删除的未来

至于第三种解决方案,因为工作线程已经建立了before关系<code>cacheFutures。放-

任何其他意见也欢迎,尤其是。考虑使用volatile关键字。应该将缓存映射声明为volatile吗?谢谢!

共有2个答案

徐高韵
2023-03-14

您有一个ConTrentMap,但您没有使用它。考虑以下内容:

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

final class SO33555545
{

  public static void main(String... argv)
    throws InterruptedException
  {
    ScheduledExecutorService workers = Executors.newScheduledThreadPool(16);
    Subscriptions sub = new Subscriptions(workers);
    sub.subscribe(() -> System.out.println("Message received: A"));
    sub.subscribe(() -> System.out.println("Message received: B"));
    Thread.sleep(TimeUnit.SECONDS.toMillis(30));
    workers.shutdown();
  }

}

final class Subscriptions
{

  private final ConcurrentMap<Subscribable, Task> tasks = new ConcurrentHashMap<>();

  private final ScheduledExecutorService workers;

  public Subscriptions(ScheduledExecutorService workers)
  {
    this.workers = workers;
  }

  void subscribe(Subscribable sub)
  {
    Task task = new Task(sub);
    Task current = tasks.putIfAbsent(sub, task);
    if (current != null)
      throw new IllegalStateException("Already subscribed");
    task.activate();
  }

  private Future<?> schedule(Subscribable sub)
  {
    Runnable task = () -> {
      sub.invoke();
      if (Math.random() < 0.25) {
        System.out.println("Delaying...");
        delay(sub, 5);
      }
    };
    return workers.scheduleAtFixedRate(task, sub.getPeriod(), sub.getPeriod(), TimeUnit.SECONDS);
  }

  void unsubscribe(Subscribable sub)
  {
    Task task = tasks.remove(sub);
    if (task != null)
      task.cancel();
  }

  private void delay(Subscribable sub, long delay)
  {
    Task task = new Task(sub);
    Task obsolete = tasks.replace(sub, task);
    if (obsolete != null) {
      obsolete.cancel();
      task.activate();
    }
  }

  private final class Task
  {

    private final FutureTask<Future<?>> future;

    Task(Subscribable sub)
    {
      this.future = new FutureTask<>(() -> schedule(sub));
    }

    void activate()
    {
      future.run();
    }

    void cancel()
    {
      boolean interrupted = false;
      while (true) {
        try {
          future.get().cancel(false);
          break;
        }
        catch (ExecutionException ignore) {
          ignore.printStackTrace(); /* Cancellation is unnecessary. */
          break;
        }
        catch (InterruptedException ex) {
          interrupted = true; /* Keep waiting... */
        }
      }
      if (interrupted)
        Thread.currentThread().interrupt(); /* Reset interrupt state. */
    }

  }

}

@FunctionalInterface
interface Subscribable
{

  default long getPeriod()
  {
    return 4;
  }

  void invoke();

}
易俊驰
2023-03-14

每个订阅一个锁将要求您维护另一个映射,从而可能引入其他并发问题。我认为最好避免这种情况。这同样适用于缓存已删除的订阅,再加上这增加了不需要的资源保留的风险(请注意,您需要缓存的不是未来本身,而是与它们关联的可订阅订阅)。

无论如何,您将需要某种同步/锁定。例如,在选项 (3) 中,您需要避免在延迟()缓存给定订阅和删除其未来之间取消订阅()。在没有某种形式的锁定的情况下,您可以避免这种情况的唯一方法是,如果每个订阅只能使用一个Future,从observe()注册到取消订阅()删除为止。这样做与延迟已计划的订阅的能力不一致。

至于第三种解决方案,由于工作线程已经建立了发生在之前的关系 cacheFutures.put() -

之前发生是程序执行中动作之间的关系。它不特定于任何一个线程的执行视图。

或者原子性只适用于未来映射,其他变量的更新将在以后传播?

控制器线程将始终看到缓存。put()通过调用延迟()执行,该延迟发生在期货之前。remove()由同一调用执行。不过,我认为这对你没什么帮助。

应该将缓存映射声明为volatile吗?

不,那是没有用的,因为尽管地图的内容发生了变化,但地图本身始终是同一个对象,对它的引用也不会改变。

您可以考虑在显示的可订阅对象上分别同步 subscribe()、delay()unsubscribe()。 这不是我理解的关于每个订阅都有一个锁定的意思,但它是相似的。这将避免需要单独的数据结构来维护此类锁。我想如果你想避免显式同步,你也可以将锁定方法构建到可订阅的界面中。

 类似资料:
  • 问题内容: 在具有并发访问的程序中使用映射时,是否需要在函数中使用互斥体来 读取 值? 问题答案: 读者众多,没有作家可以: https://groups.google.com/d/msg/golang- nuts/HpLWnGTp-n8/hyUYmnWJqiQJ 一个作家,没有读者是可以的。(否则,地图不会太好。) 否则,如果至少有一个作家,而作家或读者至少还有一个,那么 所有 读者 和 作家都

  • 问题内容: Java 对象和OS线程(轻量级进程)之间是否存在一对一的映射。也就是说,如果我有一个对象,是否可以始终精确地标识一个关联的OS线程,并且我将始终具有相同的关联OS线程吗?通常,这取决于OS和JVM,因此我将问题限于使用Oracle和Open JDK JVM的Linux。 怎么样的情况下,荷兰国际集团和荷兰国际集团线程?还有尚未开始运行的线程和已经结束运行的线程的极端情况? 问题答案:

  • 我只想知道,如果读取器和写入器线程访问ConcurrentHashMap的同一段,会发生什么情况。 情况1:读取器线程首先读取值时。 情况2:写入线程更新值,读取线程获取值。

  • 我有两个地图,键为整数,值为双倍。 我想创建第三个按键排序的映射,值将是两个映射的双精度列表。 地图1: Map2: 最终地图: 如上所述,如果一个地图中的一个键在另一个地图中丢失,则另一个地图的最终地图中的值应默认为0.00 我可以使用putAll方法将所有键放入第三张地图。但如何按照我的意愿设定这些值呢? 感谢阅读!

  • 本文向大家介绍Java多线程并发编程 并发三大要素,包括了Java多线程并发编程 并发三大要素的使用技巧和注意事项,需要的朋友参考一下 一、原子性 原子,一个不可再被分割的颗粒。原子性,指的是一个或多个不能再被分割的操作。 int i = 1; // 原子操作 i++; // 非原子操作,从主内存读取 i 到线程工作内存,进行 +1,再把 i 写到朱内存。 虽然读取和写入都是原子操作,但合起来就不

  • 问题内容: 我需要一个线程安全映射,我有类似这样的内容:(我对Java很陌生) 问题答案: