NIO : Non - Blocking I/O:非阻塞I/O
主要类:
Buffer:缓冲区;
Channel:全双工数据通道;
Selector:多路选择器。
实现过程:
例子:
package NIO;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;
import java.util.Iterator;
public class NIOServer {
public static void main(String[] args) {
int port = 8003;
Selector selector = null;
ServerSocketChannel serverChannel = null;
try {
selector = Selector.open(); //选择器打开
serverChannel = ServerSocketChannel.open();//服务端口打开
serverChannel.configureBlocking(false);//设置为非阻塞模式
serverChannel.socket().bind(new InetSocketAddress(port), 1024);//绑定端口
serverChannel.register(selector, SelectionKey.OP_ACCEPT);//注册选择器
System.out.println("服务器在端口8001等待!");
}catch(IOException e) {
e.printStackTrace();
}
while(true) {
try {
selector.select(1000);
//获取有数据动向的通道,轮询。
Set<SelectionKey> selectionKey = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKey.iterator();
SelectionKey key = null;
while(it.hasNext()) {
key = it.next();
it.remove();
try {
//对每个通道对数据,进行专门处理。
handleInput(selector, key);
}catch(Exception e) {
if(key != null) {
key.cancel();
if(key.channel() != null)
key.channel().close();
}
}
}
}catch(Exception e){
e.printStackTrace();
}
try {
Thread.sleep(500);
}catch(InterruptedException e) {
e.printStackTrace();
}
}
}
public static void handleInput(Selector selector, SelectionKey key) throws IOException {
if(key.isValid()) {
//处理新接入的信息请求
if(key.isAcceptable()) {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
}
if(key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);//在通道上调用read方法,将数值放到readBuffer中
if(readBytes > 0) {
// 将缓存字节数组的指针设置为数组的开始序列即数组下标0。
// 这样就可以从buffer开头,对该buffer进行遍历(读取)了。
readBuffer.flip();//反转缓冲区
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String request = new String(bytes, "UTF-8");
System.out.println("Client said:" + request);
String response = request + "666";
doWrite(sc, response);
}
else if (readBytes < 0) {
// 对端链路关闭
key.cancel();
sc.close();
}
}
}
}
public static void doWrite(SocketChannel sc, String response) throws IOException {
if(response !=null && response.trim().length() > 0) {
byte[] bytes = response.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
sc.write(writeBuffer);
}
}
}
客户端代码:
即时客户端只有一个,还是可以设置一下。多路选择。路数就一路。
package NIO;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Set;
import java.util.UUID;
import java.util.Iterator;
public class NIOClient {
public static void main(String[] args) {
Selector selector = null;
SocketChannel socketChannel = null;
String host = "127.0.0.1";
int port = 8003;
try {
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
if(socketChannel.connect(new InetSocketAddress(host, port))) {
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
}
else
{
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
}catch(IOException e) {
e.printStackTrace();
}
while(true) {
try {
selector.select(1000);
Set<SelectionKey> selectionKey = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKey.iterator();
SelectionKey key;
while(it.hasNext()) {
key = it.next();
it.remove();
try {
//处理每一个key
handleKey(selector, key);
}catch(Exception e) {
if(key != null) {
key.cancel();
if(key.channel() != null){
key.channel().close();
}
}
}
}
}catch(Exception e) {
e.printStackTrace();
}
}
}
public static void doWrite(SocketChannel sc) throws IOException {
byte[] str = UUID.randomUUID().toString().getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(str.length);
writeBuffer.put(str);
writeBuffer.flip();
sc.write(writeBuffer);
}
public static void handleKey(Selector selector, SelectionKey key) throws ClosedChannelException, IOException
{
if(key.isValid()) {
SocketChannel sc = (SocketChannel) key.channel();
if(key.isConnectable()) {
if(sc.finishConnect())
sc.register(selector, SelectionKey.OP_READ);
}
if(key.isReadable()) {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if(readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "UTF-8");
System.out.println("Server said: " + body);
}
else if(readBytes < 0) {
key.cancel();
sc.close();
}else;
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
doWrite(sc);
}
}
}
AIO:Asynchronous I/O。异步I/O。
只需要设置后续操作,当相关条件满足时,自动执行。
采用回调的方式进行数据读写处理。
主要类:
AsynchronousServerSocketChannel :服务器接收请求通道;
方法:
bind绑定端口,accept,接收客户端请求。
AsynchronousSocketChannel:通讯通道。
方法:
read:从通道中读数据,write:写数据到通道中。
CompletionHandler:异步处理类;
方法:
completed:操作完成调用;failed;操作失败调用。
例子:
服务端:
package AIO;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
public class AIOServer {
public static void main(String[] args) throws IOException {
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open();
server.bind(new InetSocketAddress("localhost", 8001));
System.out.println("服务器在8001端口等待!");
server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>(){
@Override
public void completed(AsynchronousSocketChannel channel, Object attachment) {
server.accept(null, this);//持续接受新的客户需求
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>(){
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip(); //反转此Buffer
CharBuffer charBuffer = CharBuffer.allocate(1024);
CharsetDecoder decoder = Charset.defaultCharset().newDecoder();
decoder.decode(attachment,charBuffer,false);
charBuffer.flip();
String data = new String(charBuffer.array(),0, charBuffer.limit());
System.out.println("client said: " + data);
channel.write(ByteBuffer.wrap((data + " 666").getBytes())); //返回结果给客户端
try{
channel.close();
}catch (Exception e){
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println("read error "+exc.getMessage());
}
});
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.print("failed: " + exc.getMessage());
}
});
while(true) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
客户端:
package AIO;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.UUID;
public class AIOClient {
public static void main(String[] args) {
try {
AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
channel.connect(new InetSocketAddress("localhost", 8001), null, new CompletionHandler<Void, Void>(){
@Override
public void completed(Void result, Void attachment) {
String str = UUID.randomUUID().toString();
channel.write(ByteBuffer.wrap(str.getBytes()), null, new CompletionHandler<Integer, Object>(){
@Override
public void completed(Integer result, Object attachment) {
try {
System.out.println("准备读取空间!");
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
channel.read(byteBuffer, byteBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip(); //反转此Buffer
CharBuffer charBuffer = CharBuffer.allocate(1024);
CharsetDecoder decoder = Charset.defaultCharset().newDecoder();
decoder.decode(attachment,charBuffer,false);
charBuffer.flip();
String data = new String(charBuffer.array(),0, charBuffer.limit());
System.out.println("server said: " + data);
try{
channel.close();
}catch (Exception e){
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println("read error " + exc.getMessage());
}
});
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("write error");
}
});
}
@Override
public void failed(Throwable exc, Void attachment) {
System.out.println("error");
}
});
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}