Java NIO & AIO编程

史飞尘
2023-12-01

NIO 编程

NIO : Non - Blocking I/O:非阻塞I/O

  1. 一个线程,可以管理多个线程。
  2. 避免同步I/O通讯差的特点。

主要类:
Buffer:缓冲区;
Channel:全双工数据通道;
Selector:多路选择器。

实现过程:

  1. 通过多路选择器轮询,获得有事件操作的集合;
  2. 通过SocketChannel读取数据;
  3. 在Buffer中实现对数据的操作。

例子:

  1. 在服务端,创建Selector,并且配置服务器通道信息,注册Selector。
  2. 轮询获取有数据变化的通道。
  3. 对每个通道中的数据内容进行读写操作。
package NIO;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;

import java.util.Iterator;

public class NIOServer {

	public static void main(String[] args) {
		int port = 8003;
		Selector selector = null;
		ServerSocketChannel serverChannel = null;
		
		try {
			selector = Selector.open();  //选择器打开
			serverChannel = ServerSocketChannel.open();//服务端口打开
			serverChannel.configureBlocking(false);//设置为非阻塞模式
			serverChannel.socket().bind(new InetSocketAddress(port), 1024);//绑定端口
			serverChannel.register(selector, SelectionKey.OP_ACCEPT);//注册选择器
			System.out.println("服务器在端口8001等待!");
		}catch(IOException e) {
			e.printStackTrace();
		}
		while(true) {
			try {
				selector.select(1000);
				//获取有数据动向的通道,轮询。
				Set<SelectionKey> selectionKey = selector.selectedKeys();
				Iterator<SelectionKey> it = selectionKey.iterator();
				SelectionKey key = null;
				while(it.hasNext()) {
					key = it.next();
					it.remove();
					try {
						//对每个通道对数据,进行专门处理。
						handleInput(selector, key);
					}catch(Exception e) {
						if(key != null) {
							key.cancel();
							if(key.channel() != null)
								key.channel().close();
						}
					}
				}
			}catch(Exception e){
				e.printStackTrace();
			}
			try {
				Thread.sleep(500);
			}catch(InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
	
	public static void handleInput(Selector selector, SelectionKey key) throws IOException {
		if(key.isValid()) {
			//处理新接入的信息请求
			if(key.isAcceptable()) {
				ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
				SocketChannel sc = ssc.accept();
				sc.configureBlocking(false);
				sc.register(selector, SelectionKey.OP_READ);
			}
			if(key.isReadable()) {
				SocketChannel sc = (SocketChannel) key.channel();
				ByteBuffer readBuffer = ByteBuffer.allocate(1024);
				int readBytes = sc.read(readBuffer);//在通道上调用read方法,将数值放到readBuffer中
				if(readBytes > 0) {
				//	将缓存字节数组的指针设置为数组的开始序列即数组下标0。
				//  这样就可以从buffer开头,对该buffer进行遍历(读取)了。 
					readBuffer.flip();//反转缓冲区
					byte[] bytes = new byte[readBuffer.remaining()];
					readBuffer.get(bytes);
					String request = new String(bytes, "UTF-8");
					System.out.println("Client said:" + request);
					
					String response = request + "666";
					doWrite(sc, response);
				}	
				else if (readBytes < 0) {
					// 对端链路关闭 
					key.cancel();
					sc.close();
				}
			}
		}
	}
	public static void doWrite(SocketChannel sc, String response) throws IOException {
		if(response !=null && response.trim().length() > 0) {
			byte[] bytes = response.getBytes();
			ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
			writeBuffer.put(bytes);
			writeBuffer.flip();
			sc.write(writeBuffer);
		}
	}
}

客户端代码:
即时客户端只有一个,还是可以设置一下。多路选择。路数就一路。

  1. 创建Selector, SocketChannel。配置信息并注册。
  2. 轮询各个通道,对每个有数据的通道进行读写处理。
package NIO;


import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Set;
import java.util.UUID;
import java.util.Iterator;

public class NIOClient {

	public static void main(String[] args) {
		Selector selector = null;
		SocketChannel socketChannel = null;
		
		String host = "127.0.0.1";
		int port = 8003;
		
		try {
			selector = Selector.open();
			socketChannel = SocketChannel.open();
			socketChannel.configureBlocking(false);
			
			if(socketChannel.connect(new InetSocketAddress(host, port))) {
				socketChannel.register(selector, SelectionKey.OP_READ);
				doWrite(socketChannel);
			}
			else 
			{
				socketChannel.register(selector, SelectionKey.OP_CONNECT);
			}
		}catch(IOException e) {
			e.printStackTrace();
		}
		
		while(true) {
			try {
				selector.select(1000);
				Set<SelectionKey> selectionKey = selector.selectedKeys();
				Iterator<SelectionKey> it = selectionKey.iterator();
				SelectionKey key;
				
				while(it.hasNext()) {
					key = it.next();
					it.remove();
					try {
						//处理每一个key
						handleKey(selector, key);
					}catch(Exception e) {
						if(key != null) {
							key.cancel();
							if(key.channel() != null){
								key.channel().close();
							}
						}
					}
				}
				
			}catch(Exception e) {
				e.printStackTrace();
			}
		}
	}
	
	public static void doWrite(SocketChannel sc) throws IOException {
		byte[] str = UUID.randomUUID().toString().getBytes();
		ByteBuffer writeBuffer = ByteBuffer.allocate(str.length);
		writeBuffer.put(str);
		writeBuffer.flip();
		sc.write(writeBuffer);
		}
	
	public static void handleKey(Selector selector, SelectionKey key) throws ClosedChannelException, IOException 
	{
		if(key.isValid()) {
			SocketChannel sc = (SocketChannel) key.channel();
			if(key.isConnectable()) {
				if(sc.finishConnect()) 
					sc.register(selector, SelectionKey.OP_READ);
			}
			
			if(key.isReadable()) {
				ByteBuffer readBuffer = ByteBuffer.allocate(1024);
				int readBytes = sc.read(readBuffer);
				if(readBytes > 0) {
					readBuffer.flip();
					byte[] bytes = new byte[readBuffer.remaining()];
					readBuffer.get(bytes);
					String body = new String(bytes, "UTF-8");
					System.out.println("Server said: " + body);
				}
				else if(readBytes < 0) {
					key.cancel();
					sc.close();
				}else;
			}
			try {
				Thread.sleep(3000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			doWrite(sc);
		}
	}
}

AIO 编程

AIO:Asynchronous I/O。异步I/O。
只需要设置后续操作,当相关条件满足时,自动执行。
采用回调的方式进行数据读写处理。

主要类:
AsynchronousServerSocketChannel :服务器接收请求通道;
方法:
bind绑定端口,accept,接收客户端请求。
AsynchronousSocketChannel:通讯通道。
方法:
read:从通道中读数据,write:写数据到通道中。
CompletionHandler:异步处理类;
方法:
completed:操作完成调用;failed;操作失败调用。

例子:
服务端:

  1. 创建通道server,绑定端口8001;
  2. 当accept方法完成,执行调用异步类中当completed方法;失败则执行failed方法。
  3. 当channel.read方法完成,执行其中的completed方法。失败则执行failed方法。
package AIO;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;

public class AIOServer {

	public static void main(String[] args) throws IOException {
		AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open();
		server.bind(new InetSocketAddress("localhost", 8001));
		System.out.println("服务器在8001端口等待!");
		
		server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>(){
			@Override
			public void completed(AsynchronousSocketChannel channel, Object attachment) {
				server.accept(null, this);//持续接受新的客户需求
				
				ByteBuffer buffer = ByteBuffer.allocate(1024);
				channel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>(){
					@Override
					public void completed(Integer result, ByteBuffer attachment) {
                        attachment.flip(); //反转此Buffer 
                        CharBuffer charBuffer = CharBuffer.allocate(1024);
                        CharsetDecoder decoder = Charset.defaultCharset().newDecoder();
                        decoder.decode(attachment,charBuffer,false);
                        charBuffer.flip();
                        String data = new String(charBuffer.array(),0, charBuffer.limit());
                        System.out.println("client said: " + data);
                        channel.write(ByteBuffer.wrap((data + " 666").getBytes())); //返回结果给客户端
                        try{
                            channel.close();
                        }catch (Exception e){
                       	 e.printStackTrace();
                        }
					}
					@Override
					public void failed(Throwable exc, ByteBuffer attachment) {
						System.out.println("read error "+exc.getMessage());						
					}
				});
			}
			@Override
			public void failed(Throwable exc, Object attachment) {
				System.out.print("failed: " + exc.getMessage());  				
			}
		});	
		while(true) {
			try {
				Thread.sleep(5000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

客户端:

  1. 创建通讯通道,并连接服务器地址。
  2. 当连接成功,执行异步处理类中的completed方法,失败执行failed方法。
  3. 当channel.write: 向通道中写数据,成功,执行其中的completed方法,失败执行failed方法。
  4. 当channel.read:读取通道中的数据,成功,执行其中completed方法,失败执行对应的failed方法。
package AIO;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.UUID;

public class AIOClient {

	public static void main(String[] args) {
		try {
			AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
			channel.connect(new InetSocketAddress("localhost", 8001), null, new CompletionHandler<Void, Void>(){
				@Override
				public void completed(Void result, Void attachment) {
					String str = UUID.randomUUID().toString();
					
					channel.write(ByteBuffer.wrap(str.getBytes()), null, new CompletionHandler<Integer, Object>(){
								@Override
								public void completed(Integer result, Object attachment) {
									try {
									System.out.println("准备读取空间!");
									ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
									
									channel.read(byteBuffer, byteBuffer, new CompletionHandler<Integer, ByteBuffer>() {
										@Override
										public void completed(Integer result, ByteBuffer attachment) {
											attachment.flip(); //反转此Buffer 
					                         CharBuffer charBuffer = CharBuffer.allocate(1024);
					                         CharsetDecoder decoder = Charset.defaultCharset().newDecoder();
					                         decoder.decode(attachment,charBuffer,false);
					                         charBuffer.flip();
					                         String data = new String(charBuffer.array(),0, charBuffer.limit());
					                         System.out.println("server said: " + data);
					                         try{
					                             channel.close();
					                         }catch (Exception e){
					                        	 e.printStackTrace();
					                         }
										}
										@Override
										public void failed(Throwable exc, ByteBuffer attachment) {
											System.out.println("read error " + exc.getMessage());											
										}
									});
										channel.close();
									} catch (IOException e) {
										e.printStackTrace();
									}
								}
								@Override
								public void failed(Throwable exc, Object attachment) {
									System.out.println("write error");
								}
					});			
				}
				@Override
				public void failed(Throwable exc, Void attachment) {
					System.out.println("error");					
				}
			});
			try {
				Thread.sleep(10000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		} catch (IOException e) {
			e.printStackTrace();
		}	
	}
}
 类似资料: