当前位置: 首页 > 工具软件 > XNIO > 使用案例 >

nio、netty、xnio和epoll简单demo

范高刚
2023-12-01

epoll  demo

// server端
#include <stdio.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <ctype.h>

#define BUF_LEN 1024
#define SER_PORT 8080
#define MAX_OPEN_FD 1024

int main(int args, char* argv[])
{
    int listenfd, connfd, epfd, ret;
    char buf[BUF_LEN];
    struct sockaddr_in cliaddr, servaddr;
    socklen_t clilen = sizeof(cliaddr);
    struct epoll_event eventP, ep[MAX_OPEN_FD];

    listenfd = socket(AF_INET, SOCK_STREAM, 0);

    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(SER_PORT);

    // 监听端口
    bind(listenfd, (struct sockaddr*) &servaddr, sizeof(servaddr));
    listen(listenfd, 20);

    epfd = epoll_create(MAX_OPEN_FD);  //创建句柄
    eventP.events = EPOLLIN;
    eventP.data.fd = listenfd;
    ret = epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &eventP);  // 注册listenfd到epfd

    for (;;)
    {
        size_t nready = epoll_wait(epfd, ep, MAX_OPEN_FD, -1);
        for (int i = 0; i < nready; ++i) {
            if (ep[i].data.fd == listenfd) // fd为socket监听fd时
            {
                connfd = accept(listenfd, (struct sockaddr*) &servaddr, &clilen);  // 客户端连接
                eventP.events = EPOLLIN;
                eventP.data.fd = connfd;
                ret = epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &eventP);  // 注册connfd到epfd
                char b[11] = {'h', 'e', 'l', 'l', 'o', ' ', 'e', 'p', 'o', 'l', 'l'};
                write(connfd, b, 11);  // 向客户端写入
            }
            else
            {
                connfd = ep[i].data.fd;
                int bytes = read(connfd, buf, BUF_LEN);  // 从客户的读取
                if (bytes > 0) {
                    // ret = epoll_ctl(epfd, EPOLL_CTL_DEL, connfd, NULL);  // 从epfd中删除connfd;
                    for (int j = 0; j < bytes; j++) 
                    {
                        printf("%c", buf[j]);
                    }
                    printf("\n");
                }
            }
        }
    }
    
}

nio demo

package com.learn;

import java.io.IOException;

public class Application {

    public static void main(String[] args) throws IOException {
        ServerDemo serverDemo = new ServerDemo();
        serverDemo.start();
        ClientDemo clientDemo = new ClientDemo();
        clientDemo.start();
        ClientDemo clientDemo2 = new ClientDemo();
        clientDemo2.start();
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        serverDemo.interrupt();
        clientDemo.interrupt();
        clientDemo2.interrupt();
    }
}
package com.learn;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;

public class ServerDemo extends Thread {

    private Selector selector;

    public ServerDemo() throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        ServerSocket serverSocket = serverSocketChannel.socket();
        serverSocket.bind(new InetSocketAddress(8080));
        selector = Selector.open();   // epoll_create
        // 此channel接收ACCEPT事件
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);  // epoll_ctl
    }

    @Override
    public void run() {
        super.run();
        try {
            while (selector.select() > 0) {  // epoll_wait
                var iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    iterator.remove();
                    // 对应上面SelectionKey.OP_ACCEPT,当有客户端连接时为true
                    if (selectionKey.isAcceptable()) {
                        ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
                        SocketChannel socketChannel = server.accept();
                        if (socketChannel == null) {
                            return;
                        }
                        ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
                        socketChannel.configureBlocking(false);
                        // 此channel接收读写事件
                        socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                        writeBuffer.put("hello111".getBytes());
                        writeBuffer.flip();
                        socketChannel.write(writeBuffer);
                    }
                    // 当接收到数据时发生为true
                    if (selectionKey.isReadable()) {
                        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                        socketChannel.read(readBuffer);
                        readBuffer.flip();
                        System.out.println("server receive: " + Charset.defaultCharset().decode(readBuffer).toString());
                    }
                    if (selectionKey.isWritable()) {
                        // 向客户端写数据
//                        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
//                        ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
//                        writeBuffer.clear();
//                        writeBuffer.put("hello 2222".getBytes());
//                        writeBuffer.flip();
//                        socketChannel.write(writeBuffer);
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
package com.learn;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;

public class ClientDemo extends Thread {

    private Selector selector;

    public ClientDemo() throws IOException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress(InetAddress.getLocalHost(), 8080));
        socketChannel.configureBlocking(false);
        selector = Selector.open();
        socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
    }

    @Override
    public void run() {
        super.run();
        try {
            while (selector.select() > 0) {
                var iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    iterator.remove();
                    if (selectionKey.isReadable()) {
                        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                        readBuffer.clear();
                        socketChannel.read(readBuffer);
                        readBuffer.flip();
                        System.out.println("client receive: " + Charset.defaultCharset().decode(readBuffer).toString());
                        ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
                        writeBuffer.put("hello 333".getBytes());
                        writeBuffer.flip();
                        socketChannel.write(writeBuffer);
                    }
                    if (selectionKey.isWritable()) {
//                        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
//                        ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
//                        writeBuffer.put("hello 2222".getBytes());
//                        writeBuffer.flip();
//                        socketChannel.write(writeBuffer);
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

netty Server

package com.company;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class Application {
    public static void main(String[] args) {
        EventLoopGroup childGroup = new NioEventLoopGroup(4);
        EventLoopGroup parentGroup = new NioEventLoopGroup(8);
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.channel(NioServerSocketChannel.class)
            .group(childGroup, parentGroup)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                            // 读
                            ByteBuf byteBuf = (ByteBuf) msg;
                            byte[] result = new byte[byteBuf.readableBytes()];
                            byteBuf.readBytes(result);
                            System.out.println(new String(result));
                            byteBuf.release();

                            // 写
                            ByteBuf writeBuf = ctx.alloc().buffer(1024);
                            writeBuf.writeBytes("hello".getBytes());
                            ctx.writeAndFlush(writeBuf);
                        }
                    });
                }
            });
        try {
            ChannelFuture f = serverBootstrap.bind(8080).sync();
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            childGroup.shutdownGracefully();
            parentGroup.shutdownGracefully();
        }
    }

}

xnio Server

package com.company;

import org.xnio.*;
import org.xnio.channels.AcceptingChannel;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;

public class Application {
    public static void main(String[] args) throws IOException {
        Xnio xnio = Xnio.getInstance();
        XnioWorker worker = xnio.createWorker(OptionMap.builder()
            .set(Options.WORKER_IO_THREADS, 4)
            .set(Options.CONNECTION_HIGH_WATER, 1000000)
            .set(Options.CONNECTION_LOW_WATER, 1000000)
            .set(Options.WORKER_TASK_CORE_THREADS, 20)
            .set(Options.WORKER_TASK_MAX_THREADS, 40)
            .set(Options.TCP_NODELAY, true)
            .set(Options.CORK, true)
            .getMap());

        ChannelListener<AcceptingChannel<StreamConnection>> acceptListener = channel -> {
            try {
                var connection = channel.accept();
                // 读
                var readChannel = connection.getSourceChannel();
                readChannel.setReadListener(rChannel -> {
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                    try {
                        while (rChannel.read(readBuffer) > 0) {
                            readBuffer.flip();
                            System.out.println(Charset.defaultCharset().decode(readBuffer).toString());
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                });
                readChannel.resumeReads();

                // 写
                var writeChannel = connection.getSinkChannel();
                writeChannel.setWriteListener(wChannel -> {
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    buffer.put("ffffffffff".getBytes());
                    buffer.flip();
                    try {
                        wChannel.write(buffer);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                });
                writeChannel.resumeWrites();
            } catch (IOException e) {
                e.printStackTrace();
            }
        };
        AcceptingChannel<StreamConnection> server = worker.createStreamConnectionServer(new InetSocketAddress(8080), acceptListener, OptionMap.EMPTY);
        server.resumeAccepts();
    }

}

 

 类似资料: