我正在学习系统如何将数百万个连接扩展到一个盒子。
通过早期的一套学习TCP 3路握手,NIO接受连接的方式。我运行了多个测试(简单的客户端-服务器代码),并一直遵循计数器-
在服务器上成功接受的次数()
在客户端成功打开()的次数,即成功计数
在客户端,成功的总和很重要
Ex:要启动的连接数N=10_000
场景1:(没有被服务器拒绝的连接,即在调用opne()时客户端没有引发异常)
服务器成功计数:9997
客户端成功计数:10_000,异常计数:0
场景2:(服务器拒绝的连接很少,即当opne()调用错误连接重置时,在客户端引发异常)
服务器成功计数:9795
客户端成功计数:9995,异常计数:5
服务器代码:
import java.nio.*;
import java.nio.channels.*;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
public class Server implements Runnable {
private final int port;
private ServerSocketChannel ssc;
private Selector selector;
private ByteBuffer buf = ByteBuffer.allocate(256);
private AtomicInteger clientCount = new AtomicInteger(0);
Server(int port) throws IOException {
this.port = port;
this.ssc = ServerSocketChannel.open();
this.ssc.socket().bind(new InetSocketAddress(port),128);
this.ssc.configureBlocking(false);
this.selector = Selector.open();
this.ssc.register(selector, SelectionKey.OP_ACCEPT);
}
@Override
public void run() {
try {
System.out.println("Server starting on port " + this.port);
Iterator<SelectionKey> iter;
SelectionKey key;
while (this.ssc.isOpen()) {
selector.select();
iter = this.selector.selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
if (key.isAcceptable()) this.handleAccept(key);
if (key.isReadable()) this.handleRead(key);
}
}
} catch (IOException e) {
System.out.println("IOException, server of port " + this.port + " terminating. Stack trace:");
e.printStackTrace();
}
}
private final ByteBuffer welcomeBuf = ByteBuffer.wrap("Welcome to Server!\n".getBytes());
private void handleAccept(SelectionKey key) throws IOException {
SocketChannel sc = ((ServerSocketChannel) key.channel()).accept();
String address = (new StringBuilder(sc.socket().getInetAddress().toString())).append(":").append(sc.socket().getPort()).toString();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ, address);
/*sc.write(welcomeBuf);
welcomeBuf.rewind();*/
System.out.println(String.format("accepted connection from: %s, number of clients: %d", address, clientCount.incrementAndGet()));//this count is lesser than success_count of client
}
private void handleRead(SelectionKey key) throws IOException {
SocketChannel ch = (SocketChannel) key.channel();
StringBuilder sb = new StringBuilder();
buf.clear();
int read = 0;
while (ch.isConnected() && (read = ch.read(buf)) > 0) {
buf.flip();
byte[] bytes = new byte[buf.limit()];
buf.get(bytes);
sb.append(new String(bytes));
buf.clear();
}
String msg;
if (read < 0) {
msg = key.attachment() + " left the chat.\n";
ch.close();
} else {
msg = key.attachment() + ": " + sb.toString();
}
System.out.println(String.format("Received message from client: %s", msg));
}
public static void main(String[] args) throws IOException {
Server server = new Server(10523);
(new Thread(server)).start();
}
}
客户端代码:
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
class Task implements Runnable {
int id;
Client client;
public Task(int id) {
this.id = id;
}
public Task(int id, Client client) {
this.id = id;
this.client = client;
}
@Override
public void run() {
try {
int port = 10523;
InetAddress hostIP = InetAddress.getLocalHost();
InetSocketAddress myAddress =
new InetSocketAddress(hostIP, port);
SocketChannel myClient = SocketChannel.open();
myClient.socket().connect(myAddress);
if(myClient.socket().isConnected()){
client.successCount.incrementAndGet();
}
} catch (Exception e) {
System.out.println("exception count: "+client.exceptionCount.addAndGet(1));
e.printStackTrace();
}
}
}
public class Client {
AtomicInteger successCount = new AtomicInteger();
AtomicInteger exceptionCount = new AtomicInteger();
public static void main(String[] args) throws InterruptedException {
Client client = new Client();
client.process();
}
private void process() throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(50);
int N = 10_000;
for (int i = 0; i < N; i++) {
Task task = new Task(i, this);
executorService.submit(task);
}
while (true){
Thread.sleep(8000);
System.out.println("success count: "+successCount.get());//success_count
}
}
}
我只是接受连接,不做任何读/写操作。
这可能是非常基本的,但是我被卡住了,无法进一步调试。任何指针都可以帮助我今天学到一些新的东西。
编辑:
我尝试了一个按顺序打开N个连接的单线程客户端,但也出现了同样的问题。客户端显示的成功/连接数多于服务器端。
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Simple Client, connect to server, listens for any events, writes dummy message to server every 3 seconds
*/
class ClientTask implements Runnable {
private Client client;
private String clientId;
private Selector clientSelector;
private static final int BUFFER_SIZE = 1024;
private ByteBuffer myBuffer = ByteBuffer.allocate(BUFFER_SIZE);
private SelectionKey key;
public ClientTask(String clientId, Client client) throws IOException {
this.clientId = clientId;
this.client = client;
this.clientSelector = Selector.open();
}
@Override
public void run() {
try {
InetSocketAddress serverAddr = new InetSocketAddress(client.getHost(), client.getPort());
System.out.println(String.format
("%s connecting to <%s:%d>", clientId, serverAddr.getHostName(), serverAddr.getPort()));
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
this.key = socketChannel.register(this.clientSelector, SelectionKey.OP_CONNECT);//listens for connection establishment events
this.key.attach(clientId);
socketChannel.connect(serverAddr);
while (true) {
//wait for seconds
synchronized (this) {
wait(3_000);
}
clientSelector.selectNow();//non blocking call
Iterator<SelectionKey> iterator = clientSelector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();
if (selectionKey.isConnectable()) {
accept(selectionKey);
} else if (selectionKey.isReadable()) {
read(selectionKey);
}
}
//write dummy message to server
write(this.key, "Dummy message from Client");
}
} catch (Exception e) {
System.out.println("exception count: " + client.failedConnectionCount.addAndGet(1));
e.printStackTrace();
}
}
/*
* On successful connection, add read(OP_READ) as new event to interested Operation set
* */
private void accept(SelectionKey selectionKey) throws IOException {
SocketChannel channel = (SocketChannel) selectionKey.channel();
boolean finishConnect = channel.finishConnect();
String cId = selectionKey.attachment().toString();
if (finishConnect) {
selectionKey.interestOps(SelectionKey.OP_READ);
System.out.println(String.format("%s connected properly", cId));
client.successConnectCount.addAndGet(1);//increment successful connection count
} else {
throw new RuntimeException(String.format("%s didn't connect properly", cId));
}
}
// Writes to server
private void write(SelectionKey clientKey, String msg) throws IOException {
if (clientKey.isValid() && clientKey.channel() instanceof SocketChannel) {
ByteBuffer msgBuf = ByteBuffer.wrap(msg.getBytes());
SocketChannel ch = (SocketChannel) clientKey.channel();
String address = ch.getRemoteAddress().toString();
int writeBytes = ch.write(msgBuf);
msgBuf.rewind();
if (writeBytes <= 0) {
throw new RuntimeException(String.format("Wrote %d bytes to client %s", writeBytes, address));
}
}
}
// Reads from server
private void read(SelectionKey selectionKey) throws IOException {
String cId = selectionKey.attachment().toString();
StringBuilder sb = new StringBuilder();
ByteBuffer buf = ByteBuffer.allocate(256 * 4);
buf.clear();
SocketChannel ch = (SocketChannel) selectionKey.channel();
while (key.isValid() && ch.isConnected() && ch.read(buf) > 0) {
buf.flip();
byte[] bytes = new byte[buf.limit()];
buf.get(bytes);
sb.append(new String(bytes));
buf.clear();
}
System.out.println(String.format("%s received data :%s from server\n\n", cId, sb.toString()));
myBuffer.clear();
}
}
/**
* A simple Java NIO Client Manager creates N clients listening to given server
*/
public class Client {
AtomicInteger successConnectCount = new AtomicInteger();
AtomicInteger failedConnectionCount = new AtomicInteger();
private int noOfClients;
private int port;
private String host;
public Client(String host, int port, int noOfClients) {
this.host = host;
this.port = port;
this.noOfClients = noOfClients;
}
public static void main(String[] args) throws InterruptedException, IOException {
if (args.length < 3) {
throw new RuntimeException("Pass 3 arguments <server-ip> <server-port> <no-of-clients> and start the client (e.g java Client localhost 1234 100)");
}
Client client = new Client(args[0], Integer.parseInt(args[1]), Integer.parseInt(args[2]));
client.process();
}
private void process() throws InterruptedException {
try {
ExecutorService executorService = Executors.newFixedThreadPool(noOfClients);
for (int i = 1; i <= noOfClients; i++) {
ClientTask task = new ClientTask(String.format("Client_%d", i), this);
executorService.submit(task);
}
} catch (Exception ex) {
ex.printStackTrace();
}
Thread.sleep(Integer.MAX_VALUE);//mock long waiting period
}
public int getPort() {
return port;
}
public String getHost() {
return host;
}
}
如果我们查看while块,它会侦听事件,并在接收到OP_CONNECT(selectionKey.isConnectable())事件时将successConnectCount增加1。
我已经使用java nio创建了一个客户端-服务器应用程序,它工作正常,但我的问题是,当服务器有许多连接到服务器的客户端时,服务器会响应错误的客户端,而不是请求客户端。例如,如果客户端A请求第一个人的信息,服务器将第一个人的信息返回给客户端B而不是客户端A。我已经尝试同步对象,但仍然无法正常工作,可能是什么问题。这是我的服务器示例代码
我想在一些计算机之间建立点对点连接,这样用户就可以在没有外部服务器的情况下聊天和交换文件。我最初的想法如下: 我在服务器上制作了一个中央服务器插座,所有应用程序都可以连接到该插座。此ServerSocket跟踪已连接的套接字(客户端),并将新连接的客户端的IP和端口提供给所有其他客户端。每个客户端都会创建一个新的ServerSocket,所有客户端都可以连接到它。 换句话说:每个客户端都有一个Se
前面的章节介绍了所有 Redis 的重要功能组件: 数据结构、数据类型、事务、Lua 环境、事件处理、数据库、持久化, 等等, 但是我们还没有对 Redis 服务器本身做任何介绍。 不过, 服务器本身并没有多少需要介绍的新东西, 因为服务器除了维持服务器状态之外, 最重要的就是将前面介绍过的各个功能模块组合起来, 而这些功能模块在前面的章节里已经介绍过了, 所以本章将焦点放在服务器的初始化过程,
创建 HTTP 客户端 使用默认选项创建一个HttpClient实例,如下所示: HttpClient client = vertx.createHttpClient(); 如果您想要在创建时配置客户端的选项,如下所示: HttpClientOptions options = new HttpClientOptions().setKeepAlive(false); HttpClient clien
创建 TCP 客户端 最简单的方法来创建一个 TCP 客户端,使用默认选项如下所示: NetClient client = vertx.createNetClient(); 配置 TCP 客户端 如果你不想使用默认值,则创建TCP 客户端时,通过传入NetClientOptions实例可以配置: NetClientOptions options = new NetClientOptions().s
问题内容: 我尝试使用以下代码从服务器到客户端发送文件和目录列表。服务器正在从客户端接收消息,但我不知道服务器是否没有发送回结果或客户端是否不接受结果。 服务器端: 问题答案: 据我所见,您在客户端上做的同时在服务器上做。从服务器发送的字符串中没有行尾字符,因此客户端将永远无法完成。执行outqw.println()或添加到要发送的内容的末尾。话虽这么说,很难用一堆注释掉的东西来浏览未格式化的代码