我构建了这个“节流”任务运行器,它在HashMap中收集一些数据,同时(每分钟)将数据“带走”并清除HashMap。在我的测试中,我注意到executor部分可以停止scheduleAtFixedRate
,并且永远不会再次清除HashMap。我假设这是因为我所做的HashMap修改不是线程安全的,它在run()
内部崩溃,没有恢复。我正在两个线程中修改HashMap。有人能告诉我如何优化HashMap修改的正确方向吗。
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class StatisticEventsDispatcher {
public HashMap<String, AbstractStatisticsEvent> mappedCachedEvents = new HashMap<>();
final Duration timeout = Duration.ofMinutes(1);
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
public StatisticEventsDispatcher(EventBus eventBus) {
executor.scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
mappedCachedEvents.values().forEach(eventBus::post);
mappedCachedEvents.clear();
// i think this is not thread safe
}
},
timeout.toMillis(),
timeout.toMillis(),
TimeUnit.MILLISECONDS);
}
public void applyChanges(String type, Map<String, Long> changes) {
AbstractStatisticsEvent event;
if (mappedCachedEvents.containsKey(type)) {
event = mappedCachedEvents.get(type);
} else {
event = new AbstractStatisticsEvent(type);
mappedCachedEvents.put(type, event);
// i think this not thread safe
}
event.apply(changes);
}
}
是的,如果两个线程在没有任何并发控制的情况下修改一个 HashMap
,则它不会按预期运行。
虽然有< code>ConcurrentMap实现,但您似乎希望自动“发布”所收集事件的快照。也就是说,似乎您不希望在迭代集合时添加新事件或更新事件,即使这可以通过线程安全的方式来完成。
在迭代期间同步对共享映射的访问相对简单,但如果迭代需要一段时间,则在此期间将阻止来自主线程的更新,这似乎是不可取的。
或者,计划任务可以短暂锁定以与新地图交换:
private final Object lock = new Object();
private HashMap<String, AbstractStatisticsEvent> mappedCachedEvents = new HashMap<>();
...
executor.scheduleAtFixedRate(() ->
{
Map<String, AbstractStatisticsEvent> toPost;
synchronized(lock) {
toPost = mappedCachedEvents;
mappedCachedEvents = new HashMap<>();
}
toPost.values().forEach(eventBus::post);
},
timeout.toMillis(), timeout.toMillis(), TimeUnit.MILLISECONDS);
...
public void applyChanges(String type, Map<String, Long> changes) {
synchronized(lock) {
AbstractStatisticsEvent event =
mappedCachedEvents.computeIfAbsent(type, AbstractStatisticsEvent::new);
event.apply(changes);
}
}
}
我建议实现如下:
public class StatisticEventsDispatcher {
private ConcurrentMap<String, AbstractStatisticsEvent> mappedCachedEvents = new ConcurrentHashMap<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
final Duration timeout = Duration.ofMinutes(1);
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
public StatisticEventsDispatcher(EventBus eventBus) {
executor.scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
Map<String, AbstractStatisticsEvent> tmp;
lock.writeLock().lock();
try {
tmp = mappedCachedEvents;
mappedCachedEvents = new ConcurrentHashMap<>();
} finally {
lock.writeLock().unlock();
}
tmp.values().forEach(eventBus::post);
}
},
timeout.toMillis(),
timeout.toMillis(),
TimeUnit.MILLISECONDS);
}
public void applyChanges(String type, Map<String, Long> changes) {
ConcurrentMap<String, AbstractStatisticsEvent> tmp;
lock.readLock().lock();
try {
tmp = mappedCachedEvents;
} finally {
lock.readLock().unlock();
}
AbstractStatisticsEvent event = tmp.computeIfAbsent(type, AbstractStatisticsEvent::new);
event.apply(changes);
}
}
请注意,我将mappedCachedEvents
设置为private
,因为如果没有锁,就无法访问它。
您的代码绝对不是线程安全的。但从你的描述来看,没有理由这样。您真的需要此调度程序中的多个线程吗?
只需更新和复制地图就非常快。一个线程总是比多个线程争用访问更快,即使你有成千上万的消费者/生产者。
所以我的建议是保留一个“拥有”Map的单线程执行器(即从调度程序的线程内部进行所有的更新/读取)。
如果“发送”地图的内容需要输入输出,请确保使用Map.copyOf(mutableMap)
将地图复制到不可变的地图中,然后将该副本提供给另一个线程进行处理。同时,“活动”地图可以继续更新/读取…你甚至可以编写一个“排水”操作,有效地将所有地图的内容放入另一个地图中,以避免同时存在重复的地图。
问题内容: 我在Oracle Java教程中遇到了这个示例,该示例描述了多线程场景中的死锁。 因此,在此示例中,我在第17行和第18行进行了以下更改。 完成这些更改后,程序将成功终止,而不会导致死锁,并在输出后进行打印 所以我的问题是-为什么会这样表现?println语句如何防止死锁? 问题答案: 无论您使用还是,都没有什么区别:它们基本上是在做同一件事。 如果在和的开始之间开始执行,则在此处发生
我有一个应用程序,它有一个ConcurrentHashMap本地存储一个存储在外部服务器上的数据副本。地图每隔几秒钟就会更新一次数据的新副本。 我有一个循环,每隔几秒钟运行一次,它可以访问HashMap并按照值的顺序将元素添加到数组中(实际上它做的事情还多一些,但这并不相关)。我的问题是,如果数据在创建数组的过程中发生了变化,您可能会在不同的地方有重复的键,或者完全省略一些键。 示例: 如您所见,
问题内容: 我假设如果实例变量是由spring IOC管理的,并且是单例的,则设计可以被称为无状态和线程安全的,因此这种设计可以扩展到集群服务器。我的假设是否正确?概述如下? 然后将其注入: 问题答案: Spring bean不是无状态的,因为它们具有状态(字段)。从技术上讲,它们甚至不是一成不变的,因为您可以随时更改注入的字段。 但是,您可以通过使用字段和构造函数注入轻松地使Spring bea
在一个采访问题中,我被要求解释一种情况,即使用并发哈希图与使用哈希图相比是正确的方法。在板上有两列t1和t2(对应于thread1和thread2),我应该编写一系列操作(如,等)使用concurrenthashmap与hashmap将产生预期结果。 我试图用迭代器举一个例子,但这不是面试官想要的。他在为线程1和线程2寻找一系列的放和拿操作。他说假设我们从不迭代,我们只有放和拿操作。 我查看了SO
Ruby给了你两个基本的方法来组织你的程序,使它同时能运行自己的不同部分。你可以使用线程在程序内部将任务分割,或者将任务分解为不同的程序,使用多进程来运行。下面我们轮流看一下这两种方法。 多线程 一般来说在Ruby中同时做两件事情最简单的是使用Ruby线程。线程在进程中,由Ruby解释器实现。这使得Ruby线程也能完全的可移至,因为它不需要依赖特定的操作系统,但是这样你也不能利用本地线程(nati
很多同学都听说过,现代操作系统比如Mac OS X,UNIX,Linux,Windows等,都是支持“多任务”的操作系统。 什么叫“多任务”呢?简单地说,就是操作系统可以同时运行多个任务。打个比方,你一边在用浏览器上网,一边在听MP3,一边在用Word赶作业,这就是多任务,至少同时有3个任务正在运行。还有很多任务悄悄地在后台同时运行着,只是桌面上没有显示而已。 现在,多核CPU已经非常普及了,但是