我有两个线程,一个在一个套接字上监听并添加到队列中,另一个从队列中减去并提交处理。第二个线程在队列为空时Hibernate。这个睡眠不知怎么会影响第一个线程,也就是说,如果您移除睡眠或使它变大,那么第一个线程的socket.receive中的延迟就会增加。如果我保持尽可能低的睡眠,它会变得更好,但不是完美的。我做错了什么?
private DatagramSocket socket;
private boolean running, listenthreadwork;
private byte[] buf = new byte[256];
private List<IUDPListener> listeners = new ArrayList<IUDPListener>();
private Thread runnerThread, listenerThread;
private ConcurrentLinkedQueue<MyObject> list = new ConcurrentLinkedQueue<MyObject>();
public void init(int port) throws SocketException
{
socket = new DatagramSocket(port);
runnerThread = new Thread(this::listenLoopUDP);
runnerThread.setName("listenLoopUDP");
listenerThread = new Thread(this::listenerThreadUDP);
listenerThread.setName("listenerThreadUDP");
running = true;
listenthreadwork = true;
runnerThread.start();
listenerThread.start();
}
private void listenerThreadUDP() {
while (listenthreadwork == true) {
MyObjectinfo = null;
synchronized (list) {
if (!list.isEmpty()) {
info = list.poll();
}
}
if (info != null) {
for (IUDPListener listener : listeners) {
listener.msgReceived(info);
}
} else {
try {
Thread.sleep(1);//Somehow affects listenLoopUDP
} catch (InterruptedException e) {
Log.write(e);
}
}
}
}
public void listenLoopUDP() {
while (running) {
DatagramPacket packet = new DatagramPacket(buf, buf.length);
try {
socket.receive(packet);
} catch (IOException e) {
if (socket.isClosed()) {
running = false;
continue;
} else {
Log.write(e);
socket.close();
}
}
String received = new String(packet.getData());
MyObject info = new MyObject(received);
synchronized (list) {
list.offer(info);
}
}
listenthreadwork = false;
try {
listenerThread.join();
} catch (InterruptedException e) {
Log.write(e);
}
}
实际上,您不需要Hibernate,而是使用适当的队列类,如LinkedBlockingQueue
,我还删除了这些标志,因为您也不需要它们,使用interrupt()
来停止一个阻塞的线程,等待队列元素:
private DatagramSocket socket;
private byte[] buf = new byte[256];
private List<IUDPListener> listeners = new ArrayList<IUDPListener>();
private Thread runnerThread, listenerThread;
private LinkedBlockingQueue<MyObject> list = new LinkedBlockingQueue<MyObject>();
public void init(int port) throws SocketException
{
socket = new DatagramSocket(port);
runnerThread = new Thread(this::listenLoopUDP);
runnerThread.setName("listenLoopUDP");
listenerThread = new Thread(this::listenerThreadUDP);
listenerThread.setName("listenerThreadUDP");
runnerThread.start();
listenerThread.start();
}
private void listenerThreadUDP() {
try {
while (true) {
MyObject info=list.take();
for (IUDPListener listener : listeners) {
listener.msgReceived(info);
}
}
} catch (InterruptedException ex) {
//Just quit
}
}
public void listenLoopUDP() {
try {
while (true) {
DatagramPacket packet = new DatagramPacket(buf, buf.length);
socket.receive(packet);
String received = new String(packet.getData());
MyObject info = new MyObject(received);
list.put(info);
}
} catch (IOException e) {
Log.write(e);
} catch (InterruptedException e) {
Log.write(e);
} finally {
//Any exception above (or a runtime one) will activate this block where we do the cleanup and interrupt the other running thread
listenerThread.interrupt();
socket.close();
}
}
我是个新手,如果你能给我建议的话,请告诉我。我有一个向客户端广播消息的服务器。然后客户端将回复发送回服务器。我想用单独的线程处理每个回复。每个回复都有mesage id和thread id。我如何用来自所有线程的信息填充一些结构,然后读取它 也从我的代码,它是正确地创建线程,而还是它存在某种方式来创建线程,只是如果我得到客户端的回复? 我是从正确的理解开始的吗? 非常感谢。
为了实现这一点,我使用了队列/线程池机制。最初,我创建一个固定数量线程的池,并有一个队列datastructure来存储客户机地址。这个队列在所有线程之间共享,因此我使用“互斥”来锁定/解锁这个队列。在主服务器线程中,我创建一个套接字,将其绑定到全局端口/地址,然后在“recvfrom”调用上阻止服务器。任何希望与服务器通信的客户端都会向侦听全局端口地址的主线程服务器发送“HI”消息。主服务器线程
我正在尝试基于Netty构建一个UDP服务器,以便根据客户端订阅(在订阅设置之前交换一些UDP请求/响应消息)将事件(大约每秒500个事件)连续发布到不同的客户端。 设计是由Java执行器创建一些生产者/消费者线程。生成消息后,使用者线程将其写入UDP通道。 观察到,服务器端只有一个EventLoop线程在为该UDP通道工作,并且它非常忙于向套接字写入消息,因此对第二个和更高版本客户端的订阅请求的
程序代码 udp_server.php //创建Server对象,监听 127.0.0.1:9502端口,类型为SWOOLE_SOCK_UDP $serv = new Swoole\Server("127.0.0.1", 9502, SWOOLE_PROCESS, SWOOLE_SOCK_UDP); //监听数据接收事件 $serv->on('Packet', function ($serv,
服务器 用于监听服务器中每个客户机的线程在名为OyenteCliente(ClientListener)的类中实现,每个客户机中监听服务器petitios的线程在OyenteServidor(ServerListener)中实现。 客户监听器 非常感谢!