我有一个类,其中liveSocketsByDatacenter
每30秒从一个后台线程填充一个映射,然后有一个方法getNextSocket
,该方法将由多个读取器线程调用以获取可用的实时套接字,该套接字使用相同的映射来获取此信息。
public class SocketManager {
private static final Random random = new Random();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new HashMap<>();
private final ZContext ctx = new ZContext();
// Lazy Loaded Singleton Pattern
private static class Holder {
private static final SocketManager instance = new SocketManager();
}
public static SocketManager getInstance() {
return Holder.instance;
}
private SocketManager() {
connectToZMQSockets();
scheduler.scheduleAtFixedRate(new Runnable() {
public void run() {
updateLiveSockets();
}
}, 30, 30, TimeUnit.SECONDS);
}
private void connectToZMQSockets() {
Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> addedColoSockets = connect(entry.getKey(), entry.getValue(), ZMQ.PUSH);
liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets);
}
}
private List<SocketHolder> connect(Datacenters colo, List<String> addresses, int socketType) {
List<SocketHolder> socketList = new ArrayList<>();
for (String address : addresses) {
try {
Socket client = ctx.createSocket(socketType);
// Set random identity to make tracing easier
String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
client.setIdentity(identity.getBytes(ZMQ.CHARSET));
client.setTCPKeepAlive(1);
client.setSendTimeOut(7);
client.setLinger(0);
client.connect(address);
SocketHolder zmq = new SocketHolder(client, ctx, address, true);
socketList.add(zmq);
} catch (Exception ex) {
// log error
}
}
return socketList;
}
// this method will be called by multiple threads to get the next live socket
public Optional<SocketHolder> getNextSocket() {
Optional<SocketHolder> liveSocket = Optional.absent();
List<Datacenters> dcs = Datacenters.getOrderedDatacenters();
for (Datacenters dc : dcs) {
liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
if (liveSocket.isPresent()) {
break;
}
}
return liveSocket;
}
private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
if (!CollectionUtils.isEmpty(listOfEndPoints)) {
Collections.shuffle(listOfEndPoints);
for (SocketHolder obj : listOfEndPoints) {
if (obj.isLive()) {
return Optional.of(obj);
}
}
}
return Optional.absent();
}
private void updateLiveSockets() {
Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
for (Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
for (SocketHolder liveSocket : liveSockets) {
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
boolean status = SendToSocket.getInstance().execute(3, holder, socket);
boolean isLive = (status) ? true : false;
SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(zmq);
}
liveSocketsByDatacenter.put(entry.getKey(), liveUpdatedSockets);
}
}
}
如您在以上课程中所见:
liveSocketsByDatacenter
使用所有活动套接字填充map。getNextSocket
方法给我一个可用的实时套接字,该套接字使用liveSocketsByDatacenter
map获取所需的信息。我上面的代码线程安全liveSocketsByDatacenter
吗,所有阅读器线程都可以正确看到?由于我liveSocketsByDatacenter
每30秒从一个后台线程然后从许多读取器线程中修改map,所以我在调用getNextSocket
method,所以不确定在这里是否做错了什么。
好像我的“
getLiveSocket”方法中可能存在线程安全问题,因为每次读取都ArrayList
从映射中共享出来并对其进行洗牌?我可能已经错过了更多其他地方。解决我的代码中这些线程安全问题的最佳方法是什么?
如果有更好的方法可以重写它,那么我也很乐意。
为了线程安全,您的代码必须将对所有共享可变状态的任何访问同步。
在这里,您共享了的 非线程安全*
实现liveSocketsByDatacenter
的实例,可以潜在地同时读取(通过和)和修改(通过和)而无需同步任何足以使您的代码非线程安全的访问。此外,此值是a的
非线程安全 实现的实例,也可以潜在地同时读取(由和)和修改(由精确地由)。HashMap
Map``updateLiveSockets``getNextSocket``connectToZMQSockets``updateLiveSockets``Map``ArrayList
***List``getNextSocket``updateLiveSockets``getLiveSocket``Collections.shuffle
解决您的2个线程安全问题的简单方法是:
ConcurrentHashMap
而不是a HashMap
,liveSocketsByDatacenter
因为它是的本机线程安全实现Map
。ArrayList
作为地图的值Collections.unmodifiableList(List<? extends T> list)
,您的列表将是不可变的,因此线程安全。例如:
liveSocketsByDatacenter.put(
entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)
);`
getLiveSocket
以避免Collections.shuffle
直接在列表上调用,例如,您可以仅对活动套接字列表进行洗牌,而不是对所有套接字进行洗牌,或者使用列表的副本(例如使用new ArrayList<>(listOfEndPoints)
)代替列表本身。例如:
private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
if (!CollectionUtils.isEmpty(listOfEndPoints)) {
// The list of live sockets
List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
for (SocketHolder obj : listOfEndPoints) {
if (obj.isLive()) {
liveOnly.add(obj);
}
}
if (!liveOnly.isEmpty()) {
// The list is not empty so we shuffle it an return the first element
Collections.shuffle(liveOnly);
return Optional.of(liveOnly.get(0));
}
}
return Optional.absent();
}
对于您似乎经常阅读并且很少(每30秒仅一次)修改地图的#1,您可以考虑重建地图,然后Collections.unmodifiableMap(Map<? extends K,? extends V> m)
每30秒共享其不可变版本(使用),这种方法在大多数情况下是非常有效的因为您不再为访问地图内容付出任何同步机制的代价。
您的代码将是:
// Your variable is no more final, it is now volatile to ensure that all
// threads will see the same thing at all time by getting it from
// the main memory instead of the CPU cache
private volatile Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter
= Collections.unmodifiableMap(new HashMap<>());
private void connectToZMQSockets() {
Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
// The map in which I put all the live sockets
Map<Datacenters, List<SocketHolder>> liveSockets = new HashMap<>();
for (Map.Entry<Datacenters, ImmutableList<String>> entry :
socketsByDatacenter.entrySet()) {
List<SocketHolder> addedColoSockets = connect(
entry.getKey(), entry.getValue(), ZMQ.PUSH
);
liveSockets.put(entry.getKey(), Collections.unmodifiableList(addedColoSockets));
}
// Set the new content of my map as an unmodifiable map
this.liveSocketsByDatacenter = Collections.unmodifiableMap(liveSockets);
}
public Optional<SocketHolder> getNextSocket() {
// For the sake of consistency make sure to use the same map instance
// in the whole implementation of my method by getting my entries
// from the local variable instead of the member variable
Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
this.liveSocketsByDatacenter;
...
}
...
// Added the modifier synchronized to prevent concurrent modification
// it is needed because to build the new map we first need to get the
// old one so both must be done atomically to prevent concistency issues
private synchronized void updateLiveSockets() {
// Initialize my new map with the current map content
Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
new HashMap<>(this.liveSocketsByDatacenter);
Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
// The map in which I put all the live sockets
Map<Datacenters, List<SocketHolder>> liveSockets = new HashMap<>();
for (Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
...
liveSockets.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
}
// Set the new content of my map as an unmodifiable map
this.liveSocketsByDatacenter = Collections.unmodifiableMap(liveSocketsByDatacenter);
}
您的字段liveSocketsByDatacenter
也可以是类型AtomicReference<Map<Datacenters, List<SocketHolder>>>
,那么final
您的地图仍将存储在volatile
变量中,但在类内AtomicReference
。
之前的代码将是:
private final AtomicReference<Map<Datacenters, List<SocketHolder>>> liveSocketsByDatacenter
= new AtomicReference<>(Collections.unmodifiableMap(new HashMap<>()));
...
private void connectToZMQSockets() {
...
// Update the map content
this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSockets));
}
public Optional<SocketHolder> getNextSocket() {
// For the sake of consistency make sure to use the same map instance
// in the whole implementation of my method by getting my entries
// from the local variable instead of the member variable
Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
this.liveSocketsByDatacenter.get();
...
}
// Added the modifier synchronized to prevent concurrent modification
// it is needed because to build the new map we first need to get the
// old one so both must be done atomically to prevent concistency issues
private synchronized void updateLiveSockets() {
// Initialize my new map with the current map content
Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
new HashMap<>(this.liveSocketsByDatacenter.get());
...
// Update the map content
this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSocketsByDatacenter));
}
我在理解中线程安全的细节时遇到了问题。我知道Hibernate会话本身不是线程安全的,所以我不会从多个线程访问它们。但是,我找不到任何有关Hibernate实体的线程安全的信息。我可以在多个线程中修改它们,而它们仍然连接到用于加载它们的会话吗? 我不会使用延迟加载(我知道这会导致并发问题)。实体将被正确同步,Hibernate将通过同步的getters访问它们。 我设想的情景是: 使用Hibern
问题内容: 我在了解线程安全的详细信息时遇到了问题。我知道这本身并不是线程安全的,所以我不会从多个线程访问它们。但是,我找不到有关Hibernate实体的线程安全性的任何信息。我可以在多线程中修改它们,而又仍然将它们附加到用于加载它们的会话中吗? 我不会使用延迟加载(我知道这会导致并发问题)。实体将被正确同步,并且hibernate将通过同步的getter访问它们。 我设想的方案: 使用hiber
我在Java 7号工作。 我想知道方法在HashSet对象上是否是线程安全的。 散列集由一个线程初始化。然后我们用不可修改的集合()包装HashSet。初始化后,多个线程只调用方法。 当我阅读Javadoc时,它对我来说是不清楚的。 在HashSet Javadoc上,我们可以阅读 这个类实现Set接口,由一个哈希表(实际上是一个HashMap实例)支持。 ... 请注意,此实现不是同步的。 在H
问题内容: ConcurrentModificationException:当不允许对对象进行并发修改时,检测到该对象的并发修改的方法可能会抛出此异常。 上面是javadoc中的ConcurrentModificationException定义。 所以我尝试测试以下代码: 代码很简单。10个线程从arraylist对象中删除该元素。确保多个线程访问一个对象。但它运行正常。没有异常被抛出。为什么?
问题内容: 我正在编写一个将HashMap返回给用户的应用程序。用户将获得对此MAP的引用。在后端,我将运行一些线程来更新Map。 到目前为止我做了什么? 我已经使所有后端线程都共享一个公用通道来更新MAP。因此,在后端,我确信并发写入操作不会成为问题。 我遇到的问题 如果用户尝试更新MAP并同时在后端更新MAP->并发写入操作问题。 如果使用尝试从MAP读取某些内容,同时MAP正在后端更新->并
我可以有一个哈希图在Java看起来像这样吗? 我的问题和这里的类似问题 我是Java新手。所以我想知道的是,如果我需要上面这样的东西,如果它无效,什么是最好的数据结构?