下面的方法位于SocketManager
类中,后台线程每60秒调用一次。它将ping一个套接字,检查它是否处于活动状态,并将所有内容放在LiveSocketsByDataCenter
映射中。
private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
new ConcurrentHashMap<>();
// runs every 60 seconds to ping all the socket to make sure whether they are alive or not
private void updateLiveSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
for (SocketHolder liveSocket : liveSockets) {
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
Message message = new Message(holder, Partition.COMMAND);
// pinging to see whether a socket is live or not
boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
boolean isLive = (status) ? true : false;
SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(zmq);
}
liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
}
}
另外,我在同一个SocketManager
类中有以下方法。getNextSocket()
方法将由多个读取线程(假设最多10个线程)同时调用,以获取下一个活动的套接字。
// this method will be called by multiple threads concurrently to get the next live socket
public Optional<SocketHolder> getNextSocket() {
for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
if (liveSocket.isPresent()) {
return liveSocket;
}
}
return Optional.absent();
}
private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
if (!listOfEndPoints.isEmpty()) {
// The list of live sockets
List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
for (SocketHolder obj : listOfEndPoints) {
if (obj.isLive()) {
liveOnly.add(obj);
}
}
if (!liveOnly.isEmpty()) {
// The list is not empty so we shuffle it an return the first element
return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
}
}
return Optional.absent();
}
updatelivesockets()
方法中的socketa
上工作,那么所有这10个线程都应该在其他活动套接字上工作(这10个线程中的每一个都在不同的活动套接字上工作)解决这个问题的最好办法是什么?我可以让synchronize
,从这10个读取线程加上定时器线程在一个套接字上同步,这将保证只有线程在该套接字上工作,但我不想在这里使用同步。必须有更好的方法来确保每个线程同时使用不同的单个活动套接字,而不是在特定的套接字上同步。我有大约60个插座和大约10个阅读器线程加上1个定时器线程。这里需要使用ThreadLocal
概念吗?
解决问题的最佳方案是使用ConcurrentQueue,而不需要使用ThreadLocal。ConcurrentQueue是非阻塞的,对于多线程环境非常高效。例如,这就是如何删除不活动的套接字并保留活动的套接字。
private final Map<Datacenters, ConcurrentLinkedQueue<SocketHolder>> liveSocketsByDatacenter =
new ConcurrentHashMap<>();
// runs every 60 seconds to ping 70 sockets the socket to make sure whether they are alive or not (it does not matter if you ping more sockets than there are in the list because you are rotating the que)
private void updateLiveSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
Queue<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
for (int i = 0; i<70; i++) {
SocketHolder s = liveSockets.poll();
Socket socket = s.getSocket();
String endpoint = s.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
Message message = new Message(holder, Partition.COMMAND);
// pinging to see whether a socket is live or not
boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
boolean isLive = (status) ? true : false;
SocketHolder zmq = new SocketHolder(socket, s.getContext(), endpoint, isLive);
liveSockets.add(zmq);
}
}
}
问题内容: 每当关于Java同步的问题浮出水面时,有些人就会很想指出应该避免的事情。他们声称,取而代之的是,最好是锁定私有引用。 给出的一些原因是: 一些邪恶的代码可能会窃取你的锁(非常流行,也有一个“偶然”的变体) 同一类中的所有同步方法都使用完全相同的锁,这会降低吞吐量 你(不必要地)暴露了太多信息 包括我在内的其他人则认为,这是一个惯用语言(在Java库中也是如此),是安全且易于理解的。应当
问题内容: 可选参数的全部原因是为了防止运行时因命中分配给nil / null / none的变量而导致崩溃。因此,变量不能为零。而是可以将它们包装为Optional类型,将它们表示为Some或None,并展开以获取Some或nil的特定内容。 但是,如果您使用或Implicitly Unwrapped Optionals 到处都将它们解包,那么由于您是一个不完善的编码器,您只会引入运行时崩溃的可
我只在一个字符串对象上找到了同步的答案,而不是两个。 这不是一项真正的任务,而是一项任务。我有一个图书馆可以把钱从一个账户转到另一个账户。我无法访问帐户对象以锁定它。我只能用图书馆里的东西。传输(字符串从、字符串到),这不是线程安全的。我有一个帐户ID为字符串的方法。我需要在没有死锁的情况下锁定这两个字符串。 到目前为止,我所做的是: > 使用创建了新字符串。intern方法(字符串fr=from
问题内容: 我想知道如果在同一个对象上同步两次,在Java中是否会出现任何奇怪的行为? 场景如下 两种方法都使用该对象并对其进行同步。当第一个方法调用第二个方法时,它会被锁定而停止吗? 我不这么认为,因为它是同一个线程,但是我不确定是否可能会出现其他任何奇怪的结果。 问题答案: 同步块使用 可重入 锁,这意味着如果线程已经持有该锁,则它可以重新获取它而不会出现问题。因此,您的代码将按预期工作。 请
问题内容: 我们都知道您由于以下原因而无法执行以下操作: 但这显然有时有效,但并非总是如此。这是一些特定的代码: 当然,这导致: 即使没有多个线程。无论如何。 解决此问题的最佳方法是什么?如何在不引发此异常的情况下循环地从集合中删除项目? 我还在这里使用任意值,不一定是an t,因此您不能依赖。 问题答案: 是安全的,您可以这样使用它: 注意,这是在迭代过程中修改集合的唯一安全方法。如果在进行迭代
问题内容: 代码段-1 代码段-2 我在第一个代码段中遇到了竞争。我知道这是因为我正在获得对不可变对象(类型为Integer)的锁定。 我已经写了第二个代码片段,这再次使“布尔”不变。但这有效(输出运行中不显示竞争条件)。如果我正确理解了上一个问题的解决方案,则以下是出现问题的一种可能方法 线程1锁定由指向的对象(例如A) 线程2现在试图获取由指向的对象的锁,并进入A的等待队列。 线程1进入同步块