当前位置: 首页 > 知识库问答 >
问题:

使用HashMap和ScheduledExecutorService进行线程安全

晋功
2023-03-14

我构建了这个“节流”任务运行器,它在HashMap中收集一些数据,同时(每分钟)将数据“带走”并清除HashMap。在我的测试中,我注意到executor部分可以停止scheduleAtFixedRate,并且永远不会再次清除HashMap。我假设这是因为我所做的HashMap修改不是线程安全的,它在run()内部崩溃,没有恢复。我正在两个线程中修改HashMap。有人能告诉我如何优化HashMap修改的正确方向吗。

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class StatisticEventsDispatcher {
  public HashMap<String, AbstractStatisticsEvent> mappedCachedEvents = new HashMap<>();

  final Duration timeout = Duration.ofMinutes(1);

  final ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);

  public StatisticEventsDispatcher(EventBus eventBus) {
    executor.scheduleAtFixedRate(
        new Runnable() {
          @Override
          public void run() {
            mappedCachedEvents.values().forEach(eventBus::post);
            mappedCachedEvents.clear();
            // i think this is not thread safe
          }
        },
        timeout.toMillis(),
        timeout.toMillis(),
        TimeUnit.MILLISECONDS);
  }

  public void applyChanges(String type, Map<String, Long> changes) {
    AbstractStatisticsEvent event;
    if (mappedCachedEvents.containsKey(type)) {
      event = mappedCachedEvents.get(type);
    } else {
      event = new AbstractStatisticsEvent(type);
      mappedCachedEvents.put(type, event);
      // i think this not thread safe
    }
    event.apply(changes);
  }
}

共有3个答案

贺宏逸
2023-03-14

是的,如果两个线程在没有任何并发控制的情况下修改一个 HashMap,则它不会按预期运行。

虽然有< code>ConcurrentMap实现,但您似乎希望自动“发布”所收集事件的快照。也就是说,似乎您不希望在迭代集合时添加新事件或更新事件,即使这可以通过线程安全的方式来完成。

在迭代期间同步对共享映射的访问相对简单,但如果迭代需要一段时间,则在此期间将阻止来自主线程的更新,这似乎是不可取的。

或者,计划任务可以短暂锁定以与新地图交换:

private final Object lock = new Object();
private HashMap<String, AbstractStatisticsEvent> mappedCachedEvents = new HashMap<>();

...

    executor.scheduleAtFixedRate(() -> 
        {
            Map<String, AbstractStatisticsEvent> toPost;
            synchronized(lock) {
              toPost = mappedCachedEvents;
              mappedCachedEvents = new HashMap<>();
            }
            toPost.values().forEach(eventBus::post);
        },
        timeout.toMillis(), timeout.toMillis(), TimeUnit.MILLISECONDS);

...

public void applyChanges(String type, Map<String, Long> changes) {
    synchronized(lock) {
        AbstractStatisticsEvent event = 
          mappedCachedEvents.computeIfAbsent(type, AbstractStatisticsEvent::new);
        event.apply(changes);        
    } 
  }
}
史英睿
2023-03-14

我建议实现如下:

public class StatisticEventsDispatcher {
    private ConcurrentMap<String, AbstractStatisticsEvent> mappedCachedEvents = new ConcurrentHashMap<>();
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    final Duration timeout = Duration.ofMinutes(1);

    final ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);

    public StatisticEventsDispatcher(EventBus eventBus) {
        executor.scheduleAtFixedRate(
                new Runnable() {
                    @Override
                    public void run() {
                        Map<String, AbstractStatisticsEvent> tmp;
                        lock.writeLock().lock();
                        try {
                            tmp = mappedCachedEvents;
                            mappedCachedEvents = new ConcurrentHashMap<>();
                        } finally {
                            lock.writeLock().unlock();
                        }
                        tmp.values().forEach(eventBus::post);
                    }
                },
                timeout.toMillis(),
                timeout.toMillis(),
                TimeUnit.MILLISECONDS);
    }

    public void applyChanges(String type, Map<String, Long> changes) {
        ConcurrentMap<String, AbstractStatisticsEvent> tmp;
        lock.readLock().lock();
        try {
            tmp = mappedCachedEvents;
        } finally {
            lock.readLock().unlock();
        }
        AbstractStatisticsEvent event = tmp.computeIfAbsent(type, AbstractStatisticsEvent::new);
        event.apply(changes);
    }
}

请注意,我将mappedCachedEvents设置为private,因为如果没有锁,就无法访问它。

符功
2023-03-14

您的代码绝对不是线程安全的。但从你的描述来看,没有理由这样。您真的需要此调度程序中的多个线程吗?

只需更新和复制地图就非常快。一个线程总是比多个线程争用访问更快,即使你有成千上万的消费者/生产者。

所以我的建议是保留一个“拥有”Map的单线程执行器(即从调度程序的线程内部进行所有的更新/读取)。

如果“发送”地图的内容需要输入输出,请确保使用Map.copyOf(mutableMap)将地图复制到不可变的地图中,然后将该副本提供给另一个线程进行处理。同时,“活动”地图可以继续更新/读取…你甚至可以编写一个“排水”操作,有效地将所有地图的内容放入另一个地图中,以避免同时存在重复的地图。

 类似资料:
  • 问题内容: 我在Oracle Java教程中遇到了这个示例,该示例描述了多线程场景中的死锁。 因此,在此示例中,我在第17行和第18行进行了以下更改。 完成这些更改后,程序将成功终止,而不会导致死锁,并在输出后进行打印 所以我的问题是-为什么会这样表现?println语句如何防止死锁? 问题答案: 无论您使用还是,都没有什么区别:它们基本上是在做同一件事。 如果在和的开始之间开始执行,则在此处发生

  • 我有一个应用程序,它有一个ConcurrentHashMap本地存储一个存储在外部服务器上的数据副本。地图每隔几秒钟就会更新一次数据的新副本。 我有一个循环,每隔几秒钟运行一次,它可以访问HashMap并按照值的顺序将元素添加到数组中(实际上它做的事情还多一些,但这并不相关)。我的问题是,如果数据在创建数组的过程中发生了变化,您可能会在不同的地方有重复的键,或者完全省略一些键。 示例: 如您所见,

  • 问题内容: 我假设如果实例变量是由spring IOC管理的,并且是单例的,则设计可以被称为无状态和线程安全的,因此这种设计可以扩展到集群服务器。我的假设是否正确?概述如下? 然后将其注入: 问题答案: Spring bean不是无状态的,因为它们具有状态(字段)。从技术上讲,它们甚至不是一成不变的,因为您可以随时更改注入的字段。 但是,您可以通过使用字段和构造函数注入轻松地使Spring bea

  • 在一个采访问题中,我被要求解释一种情况,即使用并发哈希图与使用哈希图相比是正确的方法。在板上有两列t1和t2(对应于thread1和thread2),我应该编写一系列操作(如,等)使用concurrenthashmap与hashmap将产生预期结果。 我试图用迭代器举一个例子,但这不是面试官想要的。他在为线程1和线程2寻找一系列的放和拿操作。他说假设我们从不迭代,我们只有放和拿操作。 我查看了SO

  • Ruby给了你两个基本的方法来组织你的程序,使它同时能运行自己的不同部分。你可以使用线程在程序内部将任务分割,或者将任务分解为不同的程序,使用多进程来运行。下面我们轮流看一下这两种方法。 多线程 一般来说在Ruby中同时做两件事情最简单的是使用Ruby线程。线程在进程中,由Ruby解释器实现。这使得Ruby线程也能完全的可移至,因为它不需要依赖特定的操作系统,但是这样你也不能利用本地线程(nati

  • 很多同学都听说过,现代操作系统比如Mac OS X,UNIX,Linux,Windows等,都是支持“多任务”的操作系统。 什么叫“多任务”呢?简单地说,就是操作系统可以同时运行多个任务。打个比方,你一边在用浏览器上网,一边在听MP3,一边在用Word赶作业,这就是多任务,至少同时有3个任务正在运行。还有很多任务悄悄地在后台同时运行着,只是桌面上没有显示而已。 现在,多核CPU已经非常普及了,但是