当前位置: 首页 > 知识库问答 >
问题:

Java,如何管理线程以读取套接字(websocket)?

鞠凌龙
2023-03-14

我有一个WebSocket服务器。

我的服务器为处理新连接创建新线程。在websocket断裂之前,Thread处于活动状态。

我的问题:对于1_000_000连接,我需要1_000_000线程。我如何通过线程处理许多webockets?没有等待?

ServerSocket server;
private ExecutorService executor = new ThreadPoolExecutor(1_000_000 , 1_000_000 , 7, TimeUnit.SECONDS, queue, threadFactory);

try
{
    server = new ServerSocket(port); 
}
catch (IOException e) {}

while (true)
 {
    Socket client = null;

   try
   {
        client = server.accept();

        Runnable r = new Runnable()
        {
           run()
           {
              // this is simple, original is complete WebSocket imp
               client.getInputStream().read();
           }
        };

        executor.execute(r);
    }

 catch (IOException e) {}
}

共有3个答案

云利
2023-03-14

一个诱人的想法是“不要使用java”。Java有着糟糕的绿色线程支持,但golang和erlang是基于绿色线程构建的,所以他们做得很好。

java方式似乎是工作池。因此,您创建了一个执行器池(请参见java.util.concurrent),决定在给定数量的连接中需要多少工作者,然后通过队列将连接传递给工作者。然后,工作者必须迭代其连接集,决定处理每个连接或放弃。

您的实时tcp连接数严格限制在2^16左右(65\u 536:可用端口数),但如果您有那么多连接,系统不太可能运行正常。大多数系统无法维持超过200个持久连接的性能。如果你能远远超过这个数字,我会假设你的系统在每个连接上做得不够,不能真正证明使用web套接字的合理性,但我只是猜测。

葛深
2023-03-14

想想看,你有一个套接字的地图,每次收到服务器的消息,你都会收到消息和相关的套接字!

这个操作是用操作系统(linux、windows、unix、mac-OS、...)内核完成的!

因此,您可以在一个线程中处理一百万个连接!

我们称之为无阻塞套接字,这意味着它们永远不会阻止您的线程读取或写入或任何其他操作,例如接受和...!

java有一个包来处理这个问题!JAVAnio*

工作情况如何?

  • 处理IO操作的线程
  • 用于选择哪些套接字具有操作以及操作类型的选择器
  • ByteBuffer处理读写,而不是在阻塞套接字中使用socket.stream

您还可以使用多个线程和选择器(每个选择器都有自己的线程)

看看这个例子:

非阻塞服务器。java:

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class NoneBlockingServer {

public static void main(String[] args) throws Exception
{
    runServer("localhost" , 5050);
}


private final static void runServer(String host , int port)throws Exception {
    Selector selector = Selector.open();
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.bind(new InetSocketAddress(host, port));
    serverSocketChannel.configureBlocking(false); //config to be a none-blocking serve-socket
    SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    //register to selector for operation ACCEPT !
    //also you can use selectionKey for some other stuffs !

    while (true) {

        int numberOfReadSockets = selector.select();
        //it will wait until a socket(s) be ready for some io operation
        //or other threads call selector.wakeup()

        if(numberOfReadSockets==0){
            //maybe selector.wakeup() called
            //do some sync operations here !
            continue; // continue selecting !
        }

        Iterator<SelectionKey> keys = selector.selectedKeys().iterator();

        while (keys.hasNext())
        {
            SelectionKey key = keys.next();
            keys.remove(); //remove selected key from current selection !

            //handle selected key


            if(key.isValid() && key.isReadable())
            {

                //it means this socket is valid and has data to read


                SocketChannel socketChannel =
                        (SocketChannel) key.channel();

                ByteBuffer buffer = ByteBuffer.allocate(100); // allocate 100 bytes for buffer
                //maybe you must use an allocated buffer for each connection
                // instead of allocate for each operation

                int read = socketChannel.read(buffer);

                if(read<0)
                {
                    //need to close channel !
                    socketChannel.close(); // explicitly remove from selector
                    System.out.println("CONNECTION CLOSED");
                    continue; //socket closed and other operations will skip
                }else
                {
                    buffer.flip(); // you need to learn work with ByteBuffers
                    byte[] bytes = new byte[buffer.remaining()];
                    buffer.get(bytes);
                    //maybe convert it to String
                    String msg = new String(bytes);
                    //use msg !
                    System.out.println("MESSAGE : "+msg);

                    key.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);
                    //set interestOps to WRIT and READ to write hello back message !
                    key.attach(ByteBuffer.wrap("Hello Client !".getBytes("UTF-8")));
                    //wrap a array of bytes using wrap and attach it to selectionKey
                }

            }

            if(key.isValid() && key.isWritable())
            {
                //it means this socket is valid and have space to write data !

                SocketChannel socketChannel =
                        (SocketChannel) key.channel();

                //you must represent data you want to write to this socket
                //maybe attached to selection key !
                ByteBuffer dataToWrite = (ByteBuffer) key.attachment();
                //key.attachment here to help u have some meta data about each socket
                //use it smart !

                int write = socketChannel.write(dataToWrite);

                if(write<0)
                {
                    //so means some error occurs better to close it !
                    socketChannel.close();
                    System.out.println("CONNECTION CLOSED !"); //log
                    continue;//as socket closed we will skip next operations !
                }else if(!dataToWrite.hasRemaining())
                {
                    //so all data putted to buffer !
                    key.interestOps(SelectionKey.OP_READ); // just need to read !
                }
            }

            if(key.isValid() && key.isAcceptable())
            {
                ServerSocketChannel server =
                        (ServerSocketChannel) key.channel();//just server channels has accept operation

                SocketChannel socketChannel = server.accept(); //accept it !

                socketChannel.configureBlocking(false); // config none-blocking mode

                socketChannel.register(selector , SelectionKey.OP_READ);

                //also you can register for multiple operation using | operation
                //for example register for both read and write SelectionKey.READ|SelectionKey.WRITE
                //also you can change is late using key.interestOps(int ops)


                System.out.println("NEW CONNECTION"); //log
            }

            //there is another type of key,  key.isConnectable()
            //google it !




        }
    }
}
}

这里是BlockingClient。java:

import java.net.InetSocketAddress;
import java.net.Socket;

public class BlockingClient {

//using blocking sockets !
public static void main(String[] args)throws Exception
{
    Socket socket = new Socket();
    socket.connect(new InetSocketAddress("localhost" , 5050));
    socket.getOutputStream()
            .write("Hello Server".getBytes("UTF-8"));
    byte[] buffer = new byte[100];
    int len  = socket.getInputStream().read(buffer);
    System.out.println(new String(buffer , 0 , len , "UTF-8"));
    socket.close();
}
}

在本例中,我们将Hello Server消息从阻塞客户端发送到None阻塞服务器,服务器将响应Hello客户端消息!

快跑!

祝你好运

漆雕成弘
2023-03-14

你的想法是错误的。你不应该每隔几毫秒就启动一个新线程,因为这会大大降低你的系统速度。你也不能同时打开100万连接。没有正常的操作系统会允许这样做。

与此相反,普通web服务器运行的线程数最多(例如,平均服务器上有100个线程),这些线程按顺序处理传入的请求。

 类似资料:
  • 我正在编写一个简单的套接字编程应用程序,它使用一台服务器和一个客户端。我试图启动一个线程(从客户端文件中),从套接字输入流中读取输入,这样我就可以将消息写入服务器,同时读取并打印到屏幕上。然而,当我运行我的代码时,我的代码卡在 在< code>InputReader.java文件中,读取无输入? 我的代码如下,请帮忙。 简单服务器1.java SimpleClient1.java 输入读取器.ja

  • 问题内容: 我有一个线程可以从缓冲读取器读取字符(该读取器是从套接字创建的,如下所示): 此代码只能运行一次。例如,如果客户端连接并发送此消息:“这是一个测试”和“这是另一个测试”,则主机输出为: 请注意该程序不会收到“这是另一个测试”,因为它停留在读取流上。有什么办法在不减小缓冲区大小的情况下解决这个问题?这是线程的代码: 客户/发送者的代码(不是我的代码): 问题答案: 是一个 阻塞 调用,这

  • socket_read和socket_recv之间有什么区别?我正在尝试使用PHP套接字,但使用socket_read时收到了以下警告: 请帮帮我!

  • 问题内容: 我正在测试本地计算机上的套接字。我正在尝试使用线程在一个程序中同时运行套接字和服务器。我的服务器是回显服务器,因此它可以将收到的所有消息发回。我的问题是,当我同时在客户端和服务器上启动两个线程时,当它们到达我从输入流读取的部分时,它们将“冻结”。它可以很好地工作到客户端发送消息的那部分。然后,它停止了,因为似乎客户端正在等待消息,服务器也是如此,即使我已经通过写入输出流向服务器发送了消

  • 问题内容: 我有一个侦听某些x端口的套接字。 我可以从客户端应用程序将数据发送到套接字,但是无法从服务器套接字获得任何响应。 ..此代码部分从服务器读取数据。 但是,直到关闭服务器上的套接字,我才能从服务器读取任何内容。服务器代码不受我控制,无法对其进行编辑。 如何从客户端代码中克服这个问题。 谢谢 问题答案: 为了在客户端和服务器之间进行通信,必须明确定义协议。 客户端代码将阻塞,直到从服务器收

  • 所以我有一个PHP套接字服务器和Java套接字客户端<也许这是个愚蠢的问题。。。但我没有找到答案 在客户机中,我需要从输入流中读取传入的字节,并将它们放入字节数组中 我试图这样做: [客户端(Java)] [服务器(PHP)] 但当它读取所有字节时,在。read()不会返回-1,因此循环不会停止。有一次它返回13(长度),然后-无。有什么想法吗? 解决了 [客户端(Java)] [服务器(PHP)