当前位置: 首页 > 知识库问答 >
问题:

服务器和客户端的客户端数量因Java NIO而异

越源
2023-03-14

我正在学习系统如何将数百万个连接扩展到一个盒子。

通过早期的一套学习TCP 3路握手,NIO接受连接的方式。我运行了多个测试(简单的客户端-服务器代码),并一直遵循计数器-

在服务器上成功接受的次数()

在客户端成功打开()的次数,即成功计数

在客户端,成功的总和很重要

Ex:要启动的连接数N=10_000

场景1:(没有被服务器拒绝的连接,即在调用opne()时客户端没有引发异常)

服务器成功计数:9997

客户端成功计数:10_000,异常计数:0

场景2:(服务器拒绝的连接很少,即当opne()调用错误连接重置时,在客户端引发异常)

服务器成功计数:9795

客户端成功计数:9995,异常计数:5

服务器代码:

import java.nio.*;
import java.nio.channels.*;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;

public class Server implements Runnable {
    private final int port;
    private ServerSocketChannel ssc;
    private Selector selector;
    private ByteBuffer buf = ByteBuffer.allocate(256);
    private AtomicInteger clientCount = new AtomicInteger(0);

    Server(int port) throws IOException {
        this.port = port;
        this.ssc = ServerSocketChannel.open();
        this.ssc.socket().bind(new InetSocketAddress(port),128);
        this.ssc.configureBlocking(false);
        this.selector = Selector.open();

        this.ssc.register(selector, SelectionKey.OP_ACCEPT);
    }

    @Override
    public void run() {
        try {
            System.out.println("Server starting on port " + this.port);

            Iterator<SelectionKey> iter;
            SelectionKey key;
            while (this.ssc.isOpen()) {
                selector.select();
                iter = this.selector.selectedKeys().iterator();
                while (iter.hasNext()) {
                    key = iter.next();
                    iter.remove();

                    if (key.isAcceptable()) this.handleAccept(key);
                    if (key.isReadable()) this.handleRead(key);
                }
            }
        } catch (IOException e) {
            System.out.println("IOException, server of port " + this.port + " terminating. Stack trace:");
            e.printStackTrace();
        }
    }

    private final ByteBuffer welcomeBuf = ByteBuffer.wrap("Welcome to Server!\n".getBytes());

    private void handleAccept(SelectionKey key) throws IOException {
        SocketChannel sc = ((ServerSocketChannel) key.channel()).accept();
        String address = (new StringBuilder(sc.socket().getInetAddress().toString())).append(":").append(sc.socket().getPort()).toString();
        sc.configureBlocking(false);
        sc.register(selector, SelectionKey.OP_READ, address);
        /*sc.write(welcomeBuf);
        welcomeBuf.rewind();*/
        System.out.println(String.format("accepted connection from: %s, number of clients: %d", address, clientCount.incrementAndGet()));//this count is lesser than success_count of client
    }

    private void handleRead(SelectionKey key) throws IOException {
        SocketChannel ch = (SocketChannel) key.channel();
        StringBuilder sb = new StringBuilder();

        buf.clear();
        int read = 0;

        while (ch.isConnected() && (read = ch.read(buf)) > 0) {
            buf.flip();
            byte[] bytes = new byte[buf.limit()];
            buf.get(bytes);
            sb.append(new String(bytes));
            buf.clear();
        }
        String msg;
        if (read < 0) {
            msg = key.attachment() + " left the chat.\n";
            ch.close();
        } else {
            msg = key.attachment() + ": " + sb.toString();
        }

        System.out.println(String.format("Received message from client: %s", msg));
    }

    public static void main(String[] args) throws IOException {
        Server server = new Server(10523);
        (new Thread(server)).start();
    }
}

客户端代码:

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

class Task implements Runnable {
    int id;
    Client client;

    public Task(int id) {
        this.id = id;
    }

    public Task(int id, Client client) {
        this.id = id;
        this.client = client;
    }

    @Override
    public void run() {
        try {
            int port = 10523;
            InetAddress hostIP = InetAddress.getLocalHost();
            InetSocketAddress myAddress =
                    new InetSocketAddress(hostIP, port);

            SocketChannel myClient = SocketChannel.open();
            myClient.socket().connect(myAddress);

            if(myClient.socket().isConnected()){
                client.successCount.incrementAndGet();
            }
        } catch (Exception e) {
            System.out.println("exception count: "+client.exceptionCount.addAndGet(1));
            e.printStackTrace();
        }
    }
}

public class Client {
    AtomicInteger successCount = new AtomicInteger();
    AtomicInteger exceptionCount = new AtomicInteger();
    public static void main(String[] args) throws InterruptedException {
        Client client = new Client();
        client.process();
    }

    private void process() throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(50);
        int N = 10_000;
        for (int i = 0; i < N; i++) {
            Task task = new Task(i, this);
            executorService.submit(task);
        }
        while (true){
            Thread.sleep(8000);
            System.out.println("success count: "+successCount.get());//success_count
        }
    }
}

我只是接受连接,不做任何读/写操作。

这可能是非常基本的,但是我被卡住了,无法进一步调试。任何指针都可以帮助我今天学到一些新的东西。

编辑:

我尝试了一个按顺序打开N个连接的单线程客户端,但也出现了同样的问题。客户端显示的成功/连接数多于服务器端。

共有1个答案

商弘义
2023-03-14
  • 在详细了解NIO及其语义之后,我理解了这个问题
  • 在上面的客户机代码中,我有点点击服务器(建立连接),但没有听连接是否成功。我通过选择器和OP_CONNECT事件实现了这一点。密码-
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Simple Client, connect to server, listens for any events, writes dummy message to server every 3 seconds
 */
class ClientTask implements Runnable {
    private Client client;
    private String clientId;
    private Selector clientSelector;
    private static final int BUFFER_SIZE = 1024;
    private ByteBuffer myBuffer = ByteBuffer.allocate(BUFFER_SIZE);
    private SelectionKey key;

    public ClientTask(String clientId, Client client) throws IOException {
        this.clientId = clientId;
        this.client = client;
        this.clientSelector = Selector.open();
    }

    @Override
    public void run() {
        try {
            InetSocketAddress serverAddr = new InetSocketAddress(client.getHost(), client.getPort());
            System.out.println(String.format
                    ("%s connecting to <%s:%d>", clientId, serverAddr.getHostName(), serverAddr.getPort()));

            SocketChannel socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
            this.key = socketChannel.register(this.clientSelector, SelectionKey.OP_CONNECT);//listens for connection establishment events
            this.key.attach(clientId);
            socketChannel.connect(serverAddr);
            while (true) {
                //wait for seconds
                synchronized (this) {
                    wait(3_000);
                }
                clientSelector.selectNow();//non blocking call
                Iterator<SelectionKey> iterator = clientSelector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    iterator.remove();
                    if (selectionKey.isConnectable()) {
                        accept(selectionKey);
                    } else if (selectionKey.isReadable()) {
                        read(selectionKey);
                    }
                }
                //write dummy message to server
                write(this.key, "Dummy message from Client");
            }
        } catch (Exception e) {
            System.out.println("exception count: " + client.failedConnectionCount.addAndGet(1));
            e.printStackTrace();
        }
    }

    /*
     * On successful connection, add read(OP_READ) as new event to interested Operation set
     * */
    private void accept(SelectionKey selectionKey) throws IOException {
        SocketChannel channel = (SocketChannel) selectionKey.channel();
        boolean finishConnect = channel.finishConnect();
        String cId = selectionKey.attachment().toString();
        if (finishConnect) {
            selectionKey.interestOps(SelectionKey.OP_READ);
            System.out.println(String.format("%s connected properly", cId));
            client.successConnectCount.addAndGet(1);//increment successful connection count
        } else {
            throw new RuntimeException(String.format("%s didn't connect properly", cId));
        }
    }

    // Writes to server
    private void write(SelectionKey clientKey, String msg) throws IOException {
        if (clientKey.isValid() && clientKey.channel() instanceof SocketChannel) {
            ByteBuffer msgBuf = ByteBuffer.wrap(msg.getBytes());
            SocketChannel ch = (SocketChannel) clientKey.channel();
            String address = ch.getRemoteAddress().toString();
            int writeBytes = ch.write(msgBuf);
            msgBuf.rewind();
            if (writeBytes <= 0) {
                throw new RuntimeException(String.format("Wrote %d bytes to client %s", writeBytes, address));
            }
        }
    }

    // Reads from server
    private void read(SelectionKey selectionKey) throws IOException {
        String cId = selectionKey.attachment().toString();
        StringBuilder sb = new StringBuilder();
        ByteBuffer buf = ByteBuffer.allocate(256 * 4);
        buf.clear();
        SocketChannel ch = (SocketChannel) selectionKey.channel();

        while (key.isValid() && ch.isConnected() && ch.read(buf) > 0) {
            buf.flip();
            byte[] bytes = new byte[buf.limit()];
            buf.get(bytes);
            sb.append(new String(bytes));
            buf.clear();
        }

        System.out.println(String.format("%s received data :%s from server\n\n", cId, sb.toString()));
        myBuffer.clear();
    }
}

/**
 * A simple Java NIO Client Manager creates N clients listening to given server
 */
public class Client {
    AtomicInteger successConnectCount = new AtomicInteger();
    AtomicInteger failedConnectionCount = new AtomicInteger();

    private int noOfClients;
    private int port;
    private String host;

    public Client(String host, int port, int noOfClients) {
        this.host = host;
        this.port = port;
        this.noOfClients = noOfClients;
    }

    public static void main(String[] args) throws InterruptedException, IOException {
        if (args.length < 3) {
            throw new RuntimeException("Pass 3 arguments <server-ip> <server-port> <no-of-clients> and start the client (e.g java Client localhost 1234 100)");
        }

        Client client = new Client(args[0], Integer.parseInt(args[1]), Integer.parseInt(args[2]));
        client.process();
    }

    private void process() throws InterruptedException {
        try {
            ExecutorService executorService = Executors.newFixedThreadPool(noOfClients);
            for (int i = 1; i <= noOfClients; i++) {
                ClientTask task = new ClientTask(String.format("Client_%d", i), this);
                executorService.submit(task);
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
        Thread.sleep(Integer.MAX_VALUE);//mock long waiting period
    }

    public int getPort() {
        return port;
    }

    public String getHost() {
        return host;
    }
}

如果我们查看while块,它会侦听事件,并在接收到OP_CONNECT(selectionKey.isConnectable())事件时将successConnectCount增加1。

 类似资料:
  • 我已经使用java nio创建了一个客户端-服务器应用程序,它工作正常,但我的问题是,当服务器有许多连接到服务器的客户端时,服务器会响应错误的客户端,而不是请求客户端。例如,如果客户端A请求第一个人的信息,服务器将第一个人的信息返回给客户端B而不是客户端A。我已经尝试同步对象,但仍然无法正常工作,可能是什么问题。这是我的服务器示例代码

  • 我想在一些计算机之间建立点对点连接,这样用户就可以在没有外部服务器的情况下聊天和交换文件。我最初的想法如下: 我在服务器上制作了一个中央服务器插座,所有应用程序都可以连接到该插座。此ServerSocket跟踪已连接的套接字(客户端),并将新连接的客户端的IP和端口提供给所有其他客户端。每个客户端都会创建一个新的ServerSocket,所有客户端都可以连接到它。 换句话说:每个客户端都有一个Se

  • 前面的章节介绍了所有 Redis 的重要功能组件: 数据结构、数据类型、事务、Lua 环境、事件处理、数据库、持久化, 等等, 但是我们还没有对 Redis 服务器本身做任何介绍。 不过, 服务器本身并没有多少需要介绍的新东西, 因为服务器除了维持服务器状态之外, 最重要的就是将前面介绍过的各个功能模块组合起来, 而这些功能模块在前面的章节里已经介绍过了, 所以本章将焦点放在服务器的初始化过程,

  • 创建 HTTP 客户端 使用默认选项创建一个HttpClient实例,如下所示: HttpClient client = vertx.createHttpClient(); 如果您想要在创建时配置客户端的选项,如下所示: HttpClientOptions options = new HttpClientOptions().setKeepAlive(false); HttpClient clien

  • 创建 TCP 客户端 最简单的方法来创建一个 TCP 客户端,使用默认选项如下所示: NetClient client = vertx.createNetClient(); 配置 TCP 客户端 如果你不想使用默认值,则创建TCP 客户端时,通过传入NetClientOptions实例可以配置: NetClientOptions options = new NetClientOptions().s

  • 问题内容: 我尝试使用以下代码从服务器到客户端发送文件和目录列表。服务器正在从客户端接收消息,但我不知道服务器是否没有发送回结果或客户端是否不接受结果。 服务器端: 问题答案: 据我所见,您在客户端上做的同时在服务器上做。从服务器发送的字符串中没有行尾字符,因此客户端将永远无法完成。执行outqw.println()或添加到要发送的内容的末尾。话虽这么说,很难用一堆注释掉的东西来浏览未格式化的代码