我有一个java类来处理多线程订阅服务。通过实现可订阅接口,任务可以提交给服务并定期执行。代码草图如下所示:
import java.util.concurrent.*;
public class Subscribtions {
private ConcurrentMap<Subscribable, Future<?>> futures = new ConcurrentHashMap<Subscribable, Future<?>>();
private ConcurrentMap<Subscribable, Integer> cacheFutures = new ConcurrentHashMap<Subscribable, Integer>();
private ScheduledExecutorService threads;
public Subscribtions() {
threads = Executors.newScheduledThreadPool(16);
}
public void subscribe(Subscribable subscription) {
Runnable runnable = getThread(subscription);
Future<?> future = threads.scheduleAtFixedRate(runnable, subscription.getInitialDelay(), subscription.getPeriod(), TimeUnit.SECONDS);
futures.put(subscription, future);
}
/*
* Only called from controller thread
*/
public void unsubscribe(Subscribable subscription) {
Future<?> future = futures.remove(subscription); //1. Might be removed by worker thread
if (future != null)
future.cancel(false);
else {
//3. Worker-thread view := cacheFutures.put() -> futures.remove()
//4. Controller-thread has seen futures.remove(), but has it seen cacheFutures.put()?
}
}
/*
* Only called from worker threads
*/
private void delay(Runnable runnable, Subscribable subscription, long delay) {
cacheFutures.put(subscription, 0); //2. Which is why it is cached first
Future<?> currentFuture = futures.remove(subscription);
if (currentFuture != null) {
currentFuture.cancel(false);
Future<?> future = threads.scheduleAtFixedRate(runnable, delay, subscription.getPeriod(), TimeUnit.SECONDS);
futures.put(subscription, future);
}
}
private Runnable getThread(Subscribable subscription) {
return new Runnable() {
public void run() {
//Do work...
boolean someCondition = true;
long someDelay = 100;
if (someCondition) {
delay(this, subscription, someDelay);
}
}
};
}
public interface Subscribable {
long getInitialDelay();
long getPeriod();
}
}
因此,该类允许:
订阅由外部控制线程添加/删除,但延迟仅由内部工作线程引起。例如,如果工作线程从上次执行中没有发现更新,或者例如,如果线程只需要从00.00 - 23.00执行,则可能发生这种情况。
我的问题是工作线程可能调用< code>delay()并从ConcurrentMap中删除它的future,而控制器线程可能同时调用< code>unsubscribe()。然后,如果控制器线程在工作线程放入新的future之前检查ConcurrentMap,则< code>unsubscribe()调用将会丢失。
有一些(可能不是详尽的)解决方案:
delay()
和unsubscribe()
方法之间使用锁。delay()
方法中“缓存”已删除的未来至于第三种解决方案,因为工作线程已经建立了before关系<code>cacheFutures。放-
任何其他意见也欢迎,尤其是。考虑使用volatile关键字。应该将缓存映射声明为volatile吗?谢谢!
您有一个ConTrentMap
,但您没有使用它。考虑以下内容:
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
final class SO33555545
{
public static void main(String... argv)
throws InterruptedException
{
ScheduledExecutorService workers = Executors.newScheduledThreadPool(16);
Subscriptions sub = new Subscriptions(workers);
sub.subscribe(() -> System.out.println("Message received: A"));
sub.subscribe(() -> System.out.println("Message received: B"));
Thread.sleep(TimeUnit.SECONDS.toMillis(30));
workers.shutdown();
}
}
final class Subscriptions
{
private final ConcurrentMap<Subscribable, Task> tasks = new ConcurrentHashMap<>();
private final ScheduledExecutorService workers;
public Subscriptions(ScheduledExecutorService workers)
{
this.workers = workers;
}
void subscribe(Subscribable sub)
{
Task task = new Task(sub);
Task current = tasks.putIfAbsent(sub, task);
if (current != null)
throw new IllegalStateException("Already subscribed");
task.activate();
}
private Future<?> schedule(Subscribable sub)
{
Runnable task = () -> {
sub.invoke();
if (Math.random() < 0.25) {
System.out.println("Delaying...");
delay(sub, 5);
}
};
return workers.scheduleAtFixedRate(task, sub.getPeriod(), sub.getPeriod(), TimeUnit.SECONDS);
}
void unsubscribe(Subscribable sub)
{
Task task = tasks.remove(sub);
if (task != null)
task.cancel();
}
private void delay(Subscribable sub, long delay)
{
Task task = new Task(sub);
Task obsolete = tasks.replace(sub, task);
if (obsolete != null) {
obsolete.cancel();
task.activate();
}
}
private final class Task
{
private final FutureTask<Future<?>> future;
Task(Subscribable sub)
{
this.future = new FutureTask<>(() -> schedule(sub));
}
void activate()
{
future.run();
}
void cancel()
{
boolean interrupted = false;
while (true) {
try {
future.get().cancel(false);
break;
}
catch (ExecutionException ignore) {
ignore.printStackTrace(); /* Cancellation is unnecessary. */
break;
}
catch (InterruptedException ex) {
interrupted = true; /* Keep waiting... */
}
}
if (interrupted)
Thread.currentThread().interrupt(); /* Reset interrupt state. */
}
}
}
@FunctionalInterface
interface Subscribable
{
default long getPeriod()
{
return 4;
}
void invoke();
}
每个订阅一个锁将要求您维护另一个映射,从而可能引入其他并发问题。我认为最好避免这种情况。这同样适用于缓存已删除的订阅,再加上这增加了不需要的资源保留的风险(请注意,您需要缓存的不是未来
本身,而是与它们关联的可订阅
订阅)。
无论如何,您将需要某种同步/锁定。例如,在选项 (3) 中,您需要避免在延迟()
缓存给定订阅和删除其未来
之间取消订阅()。
在没有某种形式的锁定的情况下,您可以避免这种情况的唯一方法是,如果每个订阅只能使用一个Future
,从observe()
注册到取消订阅()
删除为止。这样做与延迟已计划的订阅的能力不一致。
至于第三种解决方案,由于工作线程已经建立了发生在之前的关系 cacheFutures.put() -
之前发生是程序执行中动作之间的关系。它不特定于任何一个线程的执行视图。
或者原子性只适用于未来映射,其他变量的更新将在以后传播?
控制器线程将始终看到缓存。put()
通过调用延迟()
执行,该延迟发生在期货之前。remove()
由同一调用执行。不过,我认为这对你没什么帮助。
应该将缓存映射声明为volatile吗?
不,那是没有用的,因为尽管地图的内容发生了变化,但地图本身始终是同一个对象,对它的引用也不会改变。
您可以考虑在显示的可订阅对象上分别同步 subscribe
()、delay()
和 unsubscribe()。
这不是我理解的关于每个订阅都有一个锁定的意思,但它是相似的。这将避免需要单独的数据结构来维护此类锁。我想如果你想避免显式同步,你也可以将锁定方法构建到
可订阅
的界面中。
问题内容: 在具有并发访问的程序中使用映射时,是否需要在函数中使用互斥体来 读取 值? 问题答案: 读者众多,没有作家可以: https://groups.google.com/d/msg/golang- nuts/HpLWnGTp-n8/hyUYmnWJqiQJ 一个作家,没有读者是可以的。(否则,地图不会太好。) 否则,如果至少有一个作家,而作家或读者至少还有一个,那么 所有 读者 和 作家都
问题内容: Java 对象和OS线程(轻量级进程)之间是否存在一对一的映射。也就是说,如果我有一个对象,是否可以始终精确地标识一个关联的OS线程,并且我将始终具有相同的关联OS线程吗?通常,这取决于OS和JVM,因此我将问题限于使用Oracle和Open JDK JVM的Linux。 怎么样的情况下,荷兰国际集团和荷兰国际集团线程?还有尚未开始运行的线程和已经结束运行的线程的极端情况? 问题答案:
我只想知道,如果读取器和写入器线程访问ConcurrentHashMap的同一段,会发生什么情况。 情况1:读取器线程首先读取值时。 情况2:写入线程更新值,读取线程获取值。
我有两个地图,键为整数,值为双倍。 我想创建第三个按键排序的映射,值将是两个映射的双精度列表。 地图1: Map2: 最终地图: 如上所述,如果一个地图中的一个键在另一个地图中丢失,则另一个地图的最终地图中的值应默认为0.00 我可以使用putAll方法将所有键放入第三张地图。但如何按照我的意愿设定这些值呢? 感谢阅读!
本文向大家介绍Java多线程并发编程 并发三大要素,包括了Java多线程并发编程 并发三大要素的使用技巧和注意事项,需要的朋友参考一下 一、原子性 原子,一个不可再被分割的颗粒。原子性,指的是一个或多个不能再被分割的操作。 int i = 1; // 原子操作 i++; // 非原子操作,从主内存读取 i 到线程工作内存,进行 +1,再把 i 写到朱内存。 虽然读取和写入都是原子操作,但合起来就不
问题内容: 我需要一个线程安全映射,我有类似这样的内容:(我对Java很陌生) 问题答案: