我有大约60个套接字和20个线程,我想确保每个线程每次都在不同的套接字上工作,所以我根本不想在两个线程之间共享同一个套接字。
在我的SocketManager
类中,我有一个后台线程,它每60秒运行一次并调用updateLiveSocket()
方法。在updateLiveSocket()
方法中,我迭代我拥有的所有套接字,然后通过调用SendToQueue
类的发
方法开始逐个ping它们,并根据响应将它们标记为活的或死的。在updateLiveSocket()
方法中,我总是需要迭代所有套接字并ping它们以检查它们是活的还是死的。
现在,所有读取器线程将并发调用< code>SocketManager类的< code>getNextSocket()方法,以获取下一个可用的套接字,从而在该套接字上发送业务消息。所以我在套接字上发送两种类型的消息:
ping
消息。这仅从SocketManager
类中调用updateLiveSocket()
方法的计时器线程发送。业务
消息。这是在SendToQueue
类中完成的。因此,如果pingerer线程正在ping一个套接字以检查它们是否处于活动状态,那么其他业务线程就不应该使用该套接字。类似地,如果业务线程使用套接字在其上发送数据,那么pinger线程不应该ping那个套接字。这适用于所有插座。但我需要确保在< code>updateLiveSockets方法中,每当我的后台线程启动时,我们都在ping所有可用的套接字,以便我们可以确定哪个套接字是活动的还是死的。
下面是我的套接字管理器
类:
public class SocketManager {
private static final Random random = new Random();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
new ConcurrentHashMap<>();
private final ZContext ctx = new ZContext();
// ...
private SocketManager() {
connectToZMQSockets();
scheduler.scheduleAtFixedRate(new Runnable() {
public void run() {
updateLiveSockets();
}
}, 60, 60, TimeUnit.SECONDS);
}
// during startup, making a connection and populate once
private void connectToZMQSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets);
}
}
private List<SocketHolder> connect(List<String> paddes, int socketType) {
List<SocketHolder> socketList = new ArrayList<>();
// ....
return socketList;
}
// this method will be called by multiple threads concurrently to get the next live socket
// is there any concurrency or thread safety issue or race condition here?
public Optional<SocketHolder> getNextSocket() {
for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
if (liveSocket.isPresent()) {
return liveSocket;
}
}
return Optional.absent();
}
private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
if (!listOfEndPoints.isEmpty()) {
// The list of live sockets
List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
for (SocketHolder obj : listOfEndPoints) {
if (obj.isLive()) {
liveOnly.add(obj);
}
}
if (!liveOnly.isEmpty()) {
// The list is not empty so we shuffle it an return the first element
return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
}
}
return Optional.absent();
}
// runs every 60 seconds to ping all the available socket to make sure whether they are alive or not
private void updateLiveSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
for (SocketHolder liveSocket : liveSockets) {
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
Message message = new Message(holder, Partition.COMMAND);
// pinging to see whether a socket is live or not
boolean isLive = SendToQueue.getInstance().send(message.getAddress(), message.getEncodedRecords(), socket);
SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(zmq);
}
liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
}
}
}
这是我的SendToQueue
类:
// this method will be called by multiple reader threads (around 20) concurrently to send the data
public boolean sendAsync(final long address, final byte[] encodedRecords) {
PendingMessage m = new PendingMessage(address, encodedRecords, true);
cache.put(address, m);
return doSendAsync(m);
}
private boolean doSendAsync(final PendingMessage pendingMessage) {
Optional<SocketHolder> liveSocket = SocketManager.getInstance().getNextSocket();
if (!liveSocket.isPresent()) {
// log error
return false;
}
ZMsg msg = new ZMsg();
msg.add(pendingMessage.getEncodedRecords());
try {
// send data on a socket LINE A
return msg.send(liveSocket.get().getSocket());
} finally {
msg.destroy();
}
}
public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
PendingMessage m = new PendingMessage(address, encodedRecords, socket, false);
cache.put(address, m);
try {
if (doSendAsync(m, socket)) {
return m.waitForAck();
}
return false;
} finally {
cache.invalidate(address);
}
}
问题陈述
现在,正如您在上面看到的,我在两个线程之间共享同一个套接字。似乎SocketManager
类中的getNextSocket()
可以将
返回给
计时器线程可以访问相同的
0MQ套接字
来ping它。在这种情况下,
ThreadA
和
计时器线程,这可能会导致问题。所以我正在尝试找到一种方法,这样我就可以防止不同的线程同时将数据发送到同一个套接字并弄脏我的数据。
我能想到的一种解决方案是在发送数据时使用套接字上的
同步
,但如果许多线程使用相同的套接字,资源就没有得到很好的利用。此外,如果msg.send(socket)
被阻止(技术上不应该),则所有等待此套接字的线程都会被阻止。所以我想可能有更好的方法来确保每个线程同时使用不同的单个实时套接字,而不是在特定套接字上同步。
我将在这里提出一些观点。在getNextSocket
方法getOrderedDatacenter
中,方法将始终返回相同的有序列表,因此您将始终从头到尾从相同的数据中心进行选择(这不是问题)。
如何保证两个线程不会从< code>getNextSocket中获得相同的< code>liveSocket?
你在这里说的是真的:
同时,计时器线程可以访问同一个 0MQ 套接字来 ping 它。
我认为这里的主要问题是你没有区分自由套接字和保留套接字。
如你所说,一种选择是在每个套接字上同步。另一种选择是保存一个保留套接字的列表,当您想要获得下一个套接字或更新套接字时,只从空闲套接字中挑选。您不想更新已经被保留的套接字。
如果它适合你的需求,你也可以看看这里。
看起来您应该考虑在这里使用try-with资源功能。您有SocketHolder或Option类实现了AutoCloseable接口。例如,让我们假设Option实现了这个接口。然后Option关闭方法将实例添加回容器。我创建了一个简单的示例来说明我的意思。它还不完整,但它让您了解如何在代码中实现它。
public class ObjectManager implements AutoCloseable {
private static class ObjectManagerFactory {
private static ObjectManager objMgr = new ObjectManager();
}
private ObjectManager() {}
public static ObjectManager getInstance() { return ObjectManagerFactory.objMgr; }
private static final int SIZE = 10;
private static BlockingQueue<AutoCloseable> objects = new LinkedBlockingQueue<AutoCloseable>();
private static ScheduledExecutorService sch;
static {
for(int cnt = 0 ; cnt < SIZE ; cnt++) {
objects.add(new AutoCloseable() {
@Override
public void close() throws Exception {
System.out.println(Thread.currentThread() + " - Adding object back to pool:" + this + " size: " + objects.size());
objects.put(this);
System.out.println(Thread.currentThread() + " - Added object back to pool:" + this);
}
});
}
sch = Executors.newSingleThreadScheduledExecutor();
sch.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
updateObjects();
}
}, 10, 10, TimeUnit.MICROSECONDS);
}
static void updateObjects() {
for(int cnt = 0 ; ! objects.isEmpty() && cnt < SIZE ; cnt++ ) {
try(AutoCloseable object = objects.take()) {
System.out.println(Thread.currentThread() + " - updateObjects - updated object: " + object + " size: " + objects.size());
} catch (Throwable t) {
// TODO Auto-generated catch block
t.printStackTrace();
}
}
}
public AutoCloseable getNext() {
try {
return objects.take();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return null;
}
}
public static void main(String[] args) {
try (ObjectManager mgr = ObjectManager.getInstance()) {
for (int cnt = 0; cnt < 5; cnt++) {
try (AutoCloseable o = mgr.getNext()) {
System.out.println(Thread.currentThread() + " - Working with " + o);
Thread.sleep(1000);
} catch (Throwable t) {
t.printStackTrace();
}
}
} catch (Throwable tt) {
tt.printStackTrace();
}
}
@Override
public void close() throws Exception {
// TODO Auto-generated method stub
ObjectManager.sch.shutdownNow();
}
}
所以我试图找到一种方法,这样我就可以防止不同的线程同时向同一个套接字发送数据并把我的数据搞砸。
当然有很多不同的方法可以做到这一点。对我来说,这似乎是使用BlockingQueue
的正确方法。业务线程将从队列中获取一个套接字,并保证没有其他人使用该套接字。
private final BlockingQueue<SocketHolder> socketHolderQueue = new LinkedBlockingQueue<>();
...
public Optional<SocketHolder> getNextSocket() {
SocketHolder holder = socketHolderQueue.poll();
return holder;
}
...
public void finishedWithSocket(SocketHolder holder) {
socketHolderQueue.put(holder);
}
我认为,由于您提到的原因,在套接字上同步不是一个好主意——ping线程将阻塞业务线程。
有许多处理ping线程逻辑的方法。我会将您的Socket
与上次使用时间一起存储,然后您的ping线程可以经常从同一个BlockingQueue
中取出每个套接字,对其进行测试,并在测试后将每个套接符放回队列的末尾。
public void testSockets() {
// one run this for as many sockets as are in the queue
int numTests = socketHolderQueue.size();
for (int i = 0; i < numTests; i++) {
SocketHolder holder = socketHolderQueue.poll();
if (holder == null) {
break;
}
if (socketIsOk(socketHolder)) {
socketHolderQueue.put(socketHolder);
} else {
// close it here or something
}
}
}
您还可以让< code>getNextSocket()代码将线程从队列中取出,检查计时器,并将它们放在测试队列中,供ping线程使用,然后从队列中取出下一个线程。业务线程永远不会与ping线程同时使用同一个套接字。
根据您想要测试套接字的时间,您还可以在业务线程将计时器返回到队列之后重置计时器,这样ping线程将在X秒钟没有使用之后测试套接字。
问题内容: 我想像这样在多个线程之间共享一个变量: 我想在主线程和帮助线程之间共享,这是我创建的两个不同的Java类。有什么办法吗?谢谢! 问题答案: 二者并可以参照包含该变量的类。 然后,可以使该变量为 volatile ,这意味着 对该变量的更改在两个线程中立即可见。 有关更多信息,请参见本文。 易变变量 共享已同步的可见性功能,但不共享原子性功能。这意味着线程将自动 查看volatile变量
问题内容: 我有两个线程。可以调用修改变量的类的update方法。另一个调用读取该变量的类的update方法。只有一个线程写入,一个(或多个)线程读取该变量。由于我是多线程技术的新手,我需要在并发方面做什么? 谢谢, 问题答案: 如果有且仅有一个写线程,你可以逃脱使得它。否则,请查看答案。 仅在只有一个写线程的情况下才起作用,因为只有一个写线程,因此它始终具有的正确值。
假设有一个多线程服务器将数据写入同一端口上的两个不同套接字,其中一个专用线程处理每个套接字。两个线程是否可以同时写入各自的套接字?(所谓“同时”,我指的是真正的同时性,而不仅仅是并发交错。)或者,套接字共享同一端口的事实是否意味着强制执行互斥? 一般来说,我不清楚如何在两个任意I/O流之间共享资源。我知道两个线程不能同时写入磁盘,因为磁盘本身是共享资源。然而,在套接字和端口的情况下,我没有类似的物
null 我所尝试的:(我想是愚蠢的) 我有一个类,该对象应该作为共享,这样我就可以将其作为全局使用,而无需实例化它,例如. 这不起作用,我无法将在一个线程中接收到的数据发送到另一个线程。 我知道有一个明显的锁定问题,因为如果一个线程正在写一个对象,其他线程不能访问它,直到第一个线程完成了写。 和类。 我认为,我创建动态线程来为每个连接的客户机服务的方式(共享相同的数据源)不可能使用Global
这可能是一个很基本的问题,但它使我感到困惑。 两个不同连接的套接字可以共享一个端口吗?我正在编写一个应用服务器,它应该能够处理100K以上的并发连接,我们知道一个系统上可用的端口数量大约是60K(16bit)。一个连接的套接字被分配给一个新的(专用的)端口,因此这意味着并发连接的数量受到端口数量的限制,除非多个套接字可以共享同一个端口。所以问题是。
我使用jsPlumb允许用户构建图形。我允许用户拖动这些元素,所以我为每个endpoint使用锚集合,让jsPlumb在建立连接时从该集合中为我选择“最佳”锚。我遇到的问题是,我可能有多达十几个连接来自任何给定的endpoint,所以当许多人最终选择相同的“最佳”锚点时,这些连接将在视觉上分散注意力——在图中造成拥塞的外观。为了解决这个问题,我想告诉jsPlumb限制任何两个连接在endpoint