epoll demo
// server端
#include <stdio.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <ctype.h>
#define BUF_LEN 1024
#define SER_PORT 8080
#define MAX_OPEN_FD 1024
int main(int args, char* argv[])
{
int listenfd, connfd, epfd, ret;
char buf[BUF_LEN];
struct sockaddr_in cliaddr, servaddr;
socklen_t clilen = sizeof(cliaddr);
struct epoll_event eventP, ep[MAX_OPEN_FD];
listenfd = socket(AF_INET, SOCK_STREAM, 0);
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(SER_PORT);
// 监听端口
bind(listenfd, (struct sockaddr*) &servaddr, sizeof(servaddr));
listen(listenfd, 20);
epfd = epoll_create(MAX_OPEN_FD); //创建句柄
eventP.events = EPOLLIN;
eventP.data.fd = listenfd;
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &eventP); // 注册listenfd到epfd
for (;;)
{
size_t nready = epoll_wait(epfd, ep, MAX_OPEN_FD, -1);
for (int i = 0; i < nready; ++i) {
if (ep[i].data.fd == listenfd) // fd为socket监听fd时
{
connfd = accept(listenfd, (struct sockaddr*) &servaddr, &clilen); // 客户端连接
eventP.events = EPOLLIN;
eventP.data.fd = connfd;
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &eventP); // 注册connfd到epfd
char b[11] = {'h', 'e', 'l', 'l', 'o', ' ', 'e', 'p', 'o', 'l', 'l'};
write(connfd, b, 11); // 向客户端写入
}
else
{
connfd = ep[i].data.fd;
int bytes = read(connfd, buf, BUF_LEN); // 从客户的读取
if (bytes > 0) {
// ret = epoll_ctl(epfd, EPOLL_CTL_DEL, connfd, NULL); // 从epfd中删除connfd;
for (int j = 0; j < bytes; j++)
{
printf("%c", buf[j]);
}
printf("\n");
}
}
}
}
}
nio demo
package com.learn;
import java.io.IOException;
public class Application {
public static void main(String[] args) throws IOException {
ServerDemo serverDemo = new ServerDemo();
serverDemo.start();
ClientDemo clientDemo = new ClientDemo();
clientDemo.start();
ClientDemo clientDemo2 = new ClientDemo();
clientDemo2.start();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
serverDemo.interrupt();
clientDemo.interrupt();
clientDemo2.interrupt();
}
}
package com.learn;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
public class ServerDemo extends Thread {
private Selector selector;
public ServerDemo() throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress(8080));
selector = Selector.open(); // epoll_create
// 此channel接收ACCEPT事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // epoll_ctl
}
@Override
public void run() {
super.run();
try {
while (selector.select() > 0) { // epoll_wait
var iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();
// 对应上面SelectionKey.OP_ACCEPT,当有客户端连接时为true
if (selectionKey.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = server.accept();
if (socketChannel == null) {
return;
}
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
socketChannel.configureBlocking(false);
// 此channel接收读写事件
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
writeBuffer.put("hello111".getBytes());
writeBuffer.flip();
socketChannel.write(writeBuffer);
}
// 当接收到数据时发生为true
if (selectionKey.isReadable()) {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
socketChannel.read(readBuffer);
readBuffer.flip();
System.out.println("server receive: " + Charset.defaultCharset().decode(readBuffer).toString());
}
if (selectionKey.isWritable()) {
// 向客户端写数据
// SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
// ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
// writeBuffer.clear();
// writeBuffer.put("hello 2222".getBytes());
// writeBuffer.flip();
// socketChannel.write(writeBuffer);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
package com.learn;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
public class ClientDemo extends Thread {
private Selector selector;
public ClientDemo() throws IOException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress(InetAddress.getLocalHost(), 8080));
socketChannel.configureBlocking(false);
selector = Selector.open();
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
@Override
public void run() {
super.run();
try {
while (selector.select() > 0) {
var iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();
if (selectionKey.isReadable()) {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
readBuffer.clear();
socketChannel.read(readBuffer);
readBuffer.flip();
System.out.println("client receive: " + Charset.defaultCharset().decode(readBuffer).toString());
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
writeBuffer.put("hello 333".getBytes());
writeBuffer.flip();
socketChannel.write(writeBuffer);
}
if (selectionKey.isWritable()) {
// SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
// ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
// writeBuffer.put("hello 2222".getBytes());
// writeBuffer.flip();
// socketChannel.write(writeBuffer);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
netty Server
package com.company;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class Application {
public static void main(String[] args) {
EventLoopGroup childGroup = new NioEventLoopGroup(4);
EventLoopGroup parentGroup = new NioEventLoopGroup(8);
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class)
.group(childGroup, parentGroup)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 读
ByteBuf byteBuf = (ByteBuf) msg;
byte[] result = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(result);
System.out.println(new String(result));
byteBuf.release();
// 写
ByteBuf writeBuf = ctx.alloc().buffer(1024);
writeBuf.writeBytes("hello".getBytes());
ctx.writeAndFlush(writeBuf);
}
});
}
});
try {
ChannelFuture f = serverBootstrap.bind(8080).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
childGroup.shutdownGracefully();
parentGroup.shutdownGracefully();
}
}
}
xnio Server
package com.company;
import org.xnio.*;
import org.xnio.channels.AcceptingChannel;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
public class Application {
public static void main(String[] args) throws IOException {
Xnio xnio = Xnio.getInstance();
XnioWorker worker = xnio.createWorker(OptionMap.builder()
.set(Options.WORKER_IO_THREADS, 4)
.set(Options.CONNECTION_HIGH_WATER, 1000000)
.set(Options.CONNECTION_LOW_WATER, 1000000)
.set(Options.WORKER_TASK_CORE_THREADS, 20)
.set(Options.WORKER_TASK_MAX_THREADS, 40)
.set(Options.TCP_NODELAY, true)
.set(Options.CORK, true)
.getMap());
ChannelListener<AcceptingChannel<StreamConnection>> acceptListener = channel -> {
try {
var connection = channel.accept();
// 读
var readChannel = connection.getSourceChannel();
readChannel.setReadListener(rChannel -> {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
try {
while (rChannel.read(readBuffer) > 0) {
readBuffer.flip();
System.out.println(Charset.defaultCharset().decode(readBuffer).toString());
}
} catch (IOException e) {
e.printStackTrace();
}
});
readChannel.resumeReads();
// 写
var writeChannel = connection.getSinkChannel();
writeChannel.setWriteListener(wChannel -> {
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("ffffffffff".getBytes());
buffer.flip();
try {
wChannel.write(buffer);
} catch (IOException e) {
e.printStackTrace();
}
});
writeChannel.resumeWrites();
} catch (IOException e) {
e.printStackTrace();
}
};
AcceptingChannel<StreamConnection> server = worker.createStreamConnectionServer(new InetSocketAddress(8080), acceptListener, OptionMap.EMPTY);
server.resumeAccepts();
}
}