我有一个类,其中liveSocketsByDatacenter
每隔30秒从一个后台线程中填充一个映射到updateLiveSockets()
方法中,然后有一个方法getNextSocket()
,该方法将被多个读取器线程调用以获取一个可用的实时套接字,该套接字使用相同的映射来获取此信息。
public class SocketManager {
private static final Random random = new Random();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final AtomicReference<Map<Datacenters, List<SocketHolder>>> liveSocketsByDatacenter =
new AtomicReference<>(Collections.unmodifiableMap(new HashMap<>()));
private final ZContext ctx = new ZContext();
// Lazy Loaded Singleton Pattern
private static class Holder {
private static final SocketManager instance = new SocketManager();
}
public static SocketManager getInstance() {
return Holder.instance;
}
private SocketManager() {
connectToZMQSockets();
scheduler.scheduleAtFixedRate(new Runnable() {
public void run() {
updateLiveSockets();
}
}, 30, 30, TimeUnit.SECONDS);
}
// during startup, making a connection and populate once
private void connectToZMQSockets() {
Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
// The map in which I put all the live sockets
Map<Datacenters, List<SocketHolder>> updatedLiveSocketsByDatacenter = new HashMap<>();
for (Map.Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> addedColoSockets = connect(entry.getKey(), entry.getValue(), ZMQ.PUSH);
updatedLiveSocketsByDatacenter.put(entry.getKey(),
Collections.unmodifiableList(addedColoSockets));
}
// Update the map content
this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(updatedLiveSocketsByDatacenter));
}
private List<SocketHolder> connect(Datacenters colo, List<String> addresses, int socketType) {
List<SocketHolder> socketList = new ArrayList<>();
for (String address : addresses) {
try {
Socket client = ctx.createSocket(socketType);
// Set random identity to make tracing easier
String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
client.setIdentity(identity.getBytes(ZMQ.CHARSET));
client.setTCPKeepAlive(1);
client.setSendTimeOut(7);
client.setLinger(0);
client.connect(address);
SocketHolder zmq = new SocketHolder(client, ctx, address, true);
socketList.add(zmq);
} catch (Exception ex) {
// log error
}
}
return socketList;
}
// this method will be called by multiple threads to get the next live socket
// is there any concurrency or thread safety issue or race condition here?
public Optional<SocketHolder> getNextSocket() {
// For the sake of consistency make sure to use the same map instance
// in the whole implementation of my method by getting my entries
// from the local variable instead of the member variable
Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
this.liveSocketsByDatacenter.get();
Optional<SocketHolder> liveSocket = Optional.absent();
List<Datacenters> dcs = Datacenters.getOrderedDatacenters();
for (Datacenters dc : dcs) {
liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
if (liveSocket.isPresent()) {
break;
}
}
return liveSocket;
}
// is there any concurrency or thread safety issue or race condition here?
private Optional<SocketHolder> getLiveSocketX(final List<SocketHolder> endpoints) {
if (!CollectionUtils.isEmpty(endpoints)) {
// The list of live sockets
List<SocketHolder> liveOnly = new ArrayList<>(endpoints.size());
for (SocketHolder obj : endpoints) {
if (obj.isLive()) {
liveOnly.add(obj);
}
}
if (!liveOnly.isEmpty()) {
// The list is not empty so we shuffle it an return the first element
Collections.shuffle(liveOnly);
return Optional.of(liveOnly.get(0));
}
}
return Optional.absent();
}
// Added the modifier synchronized to prevent concurrent modification
// it is needed because to build the new map we first need to get the
// old one so both must be done atomically to prevent concistency issues
private synchronized void updateLiveSockets() {
Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
// Initialize my new map with the current map content
Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
new HashMap<>(this.liveSocketsByDatacenter.get());
for (Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
for (SocketHolder liveSocket : liveSockets) { // LINE A
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
Message message = new Message(holder, Partition.COMMAND);
boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
boolean isLive = (status) ? true : false;
// is there any problem the way I am using `SocketHolder` class?
SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(zmq);
}
liveSocketsByDatacenter.put(entry.getKey(),
Collections.unmodifiableList(liveUpdatedSockets));
}
this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSocketsByDatacenter));
}
}
如您在我班上所见:
liveSocketsByDatacenter
使用方法中的所有活动套接字填充map updateLiveSockets()
。getNextSocket()
方法,以提供一个可用的实时套接字,该套接字使用liveSocketsByDatacenter
映射来获取所需的信息。我的代码运行正常,没有任何问题,并且想看看是否有更好或更有效的方式编写此代码。我还想对线程安全性问题或任何竞争条件(如果有)提出意见,但到目前为止我还没有看到任何问题,但我可能是错的。
我主要担心updateLiveSockets()
方法和getLiveSocketX()
方法。我迭代liveSockets
这是一个List
的SocketHolder
在A线,然后产生一个新的SocketHolder
对象,并加入到另一个新的列表。这可以吗?
注意: SocketHolder
是一个不可变的类。您可以忽略ZeroMQ
我拥有的东西。
您使用以下同步技术。
updateLiveSockets()
方法是同步的(隐式地),这将防止同时通过两个线程切换映射。getNextSocket()
方法期间发生切换,则在使用地图时要对地图进行本地引用以避免混淆。是否像现在一样是线程安全的?
线程安全性始终取决于共享的可变数据上是否存在正确的同步。在这种情况下,共享的可变数据是数据中心到其SocketHolders列表的映射。
地图位于中AtomicReference
,并制作本地副本以供使用的事实足以在地图上实现同步。您的方法采用了地图的一个版本,并使用该版本,由于的性质,切换版本是线程安全的AtomicReference
。只需为地图创建member字段,也可以实现此目的volatile
,因为您要做的只是更新参考(您无需对其进行任何先检查后操作操作)。
由于可以scheduleAtFixedRate()
确保所传递的消息Runnable
不会与自身并发运行,因此不需要synchronized
on
updateLiveSockets()
,但是它也没有任何实质性的危害。
所以是的,该类是线程安全的。
但是,尚不清楚a是否SocketHolder
可以同时被多个线程使用。照原样,此类仅尝试SocketHolder
通过选择一个随机活动对象来最大程度地减少对s的并发使用(尽管无需重新整理整个数组以选择一个随机索引)。它实际上没有阻止并发使用。
可以提高效率吗?
我相信可以。当查看该updateLiveSockets()
方法时,似乎它会构建完全相同的映射,只是SocketHolder
s的isLive
标志值可能不同。这使我得出结论,我只想切换地图中的每个列表,而不是切换整个地图。为了以线程安全的方式更改映射中的条目,我可以使用ConcurrentHashMap
。
如果我使用ConcurrentHashMap
,并且不切换地图,而是切换地图中的值,则可以摆脱AtomicReference
。
要更改映射,我可以构建新列表并将其直接放入映射中。这是更有效的,因为我可以更快地发布数据,创建的对象也更少,而同步仅基于现成的组件构建,这有利于提高可读性。
这是我的构建(为简洁起见,省略了一些不太重要的部分)
public class SocketManager {
private static final Random random = new Random();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<>(); // use ConcurrentHashMap
private final ZContext ctx = new ZContext();
// ...
private SocketManager() {
connectToZMQSockets();
scheduler.scheduleAtFixedRate(this::updateLiveSockets, 30, 30, TimeUnit.SECONDS);
}
// during startup, making a connection and populate once
private void connectToZMQSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets); // we can put it straight into the map
}
}
// ...
// this method will be called by multiple threads to get the next live socket
// is there any concurrency or thread safety issue or race condition here?
public Optional<SocketHolder> getNextSocket() {
for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc)); // no more need for a local copy, ConcurrentHashMap, makes sure I get the latest mapped List<SocketHolder>
if (liveSocket.isPresent()) {
return liveSocket;
}
}
return Optional.absent();
}
// is there any concurrency or thread safety issue or race condition here?
private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
if (!CollectionUtils.isEmpty(listOfEndPoints)) {
// The list of live sockets
List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
for (SocketHolder obj : listOfEndPoints) {
if (obj.isLive()) {
liveOnly.add(obj);
}
}
if (!liveOnly.isEmpty()) {
// The list is not empty so we shuffle it an return the first element
return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
}
}
return Optional.absent();
}
// no need to make this synchronized
private void updateLiveSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
for (SocketHolder liveSocket : liveSockets) { // LINE A
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
Message message = new Message(holder, Partition.COMMAND);
boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
boolean isLive = (status) ? true : false;
SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(zmq);
}
liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)); // just put it straigth into the map, the mapping will be updated in a thread safe manner.
}
}
}
问题内容: 我有一个类,其中每30秒从一个后台线程填充一个映射,然后有一个方法,该方法将由多个读取器线程调用以获取可用的实时套接字,该套接字使用相同的映射来获取此信息。 如您在以上课程中所见: 从每30秒运行一次的单个后台线程,我使用所有活动套接字填充map。 然后,从多个线程中,我调用方法给我一个可用的实时套接字,该套接字使用map获取所需的信息。 我上面的代码线程安全吗,所有阅读器线程都可以正
我有一个映射,我正在遍历它,并使用循环修改映射的值。我想知道是否有一种方法可以使用Java8流来实现。 这是我的代码: 基本上,我想流化地图值,设置值,并将其作为新地图。
单击地图信息选项卡,可以查看当前地图的信息。单击地图名称旁边的 笔状图标,可以修改当前地图的信息。
我正在使用Bing Maps v8 API,试图将我自己的GeoJSON加载到Bing Maps上,如本例所示:https://www.Bing.com/API/Maps/sdkrelease/mapcontrol/isdk#GeoJSONReadObject+js 我正在成功地创建我的JSON(我已经使用Bing Maps拖放特性测试了它,并且我的所有点都显示在Map.https://bingm
本文向大家介绍ecshop 2.72如何修改后台访问地址,包括了ecshop 2.72如何修改后台访问地址的使用技巧和注意事项,需要的朋友参考一下 2.72版本之后修改后台登录地址非常方便,步骤如下: 1、修改 admin 文件夹名称为别人猜不到的,例如 ecshop 2、打开data/config.php文件 查找: 修改为(把其中的 admin 修改为 ecshop ): 3、这样访问 域名/
我正在用html/jsp页面主体中的Google地图做一个web动态项目。 我制作了一个函数,可以创建一个标记(lat、lng、map),并在标记的参数中使用一个特殊的image.png作为图标。 在我的地图中,我制作了两种不同的样式(地图的颜色…)“白天”和“夜晚”。 我想知道当用户点击夜间更改样式时,我如何更改标记的图标。事实上,标记的颜色不适合这种风格的地图... 我尝试用相同的名称初始化不