Netty为什么选择NIO
前言:总所周知,Netty是基于Nio的,本篇将会从代码的角度来说明Netty为什么选择Nio。
一、前言
下文将会以一个实际例子——回文服务器(实时返回客户端发送的消息),来详细比较Bio、Nio、Aio之间的性能关系,这里对于它们之间的关系以及具体原理就不再多做过多的介绍了,读者可自行查阅相关资料。
先上客户端代码,如下
// 客户端是通用的
public class Client {
private void initClient(int port) throws IOException, InterruptedException {
Socket socket = new Socket(InetAddress.getLocalHost(), port);
PrintWriter pw = new PrintWriter(socket.getOutputStream());
BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String req = "hello";
pw.print(req);
pw.flush();
String temp;
while ((temp = br.readLine()) != null) {
System.out.println("收到服务端回声消息:" + temp);
}
pw.close();
}
public void beginTest(int port) throws InterruptedException {
int number = 3000;
AtomicInteger atomicInteger = new AtomicInteger(0);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(number, number + 10, 10, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(number), new ThreadFactory() {
@Override
public Thread newThread( Runnable r) {
return new Thread(r, "线程" + (number - atomicInteger.incrementAndGet()));
}
});
int preparedThread = threadPoolExecutor.prestartAllCoreThreads();
System.out.println("线程成功预热数:" + preparedThread);
long beginTime = System.currentTimeMillis();
CyclicBarrier barrier = new CyclicBarrier(number);
CountDownLatch countDownLatch = new CountDownLatch(number);
for (int i = 0; i < number; i++) {
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
try {
barrier.await();
initClient(port);
} catch (IOException | InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
}
});
}
countDownLatch.await();
System.out.println(number + "线程连接服务器,总耗时:" + (System.currentTimeMillis() - beginTime) + "ms");
System.exit(0);
}
}
二、BIO
BIO(Block input output),同步阻塞IO模式,我们所熟悉的java.io包下面都是指的是BIO。IO线程阻塞于外部输入,这个过程会一直占着CPU资源不放,这种用法目前只限于应用内部的单线程逻辑,面对互联网上的高并发请求明显是不可取的。
回文服务器示例代码如下
public class BIOServer {
public void initServer () throws IOException {
ServerSocket serverSocket = new ServerSocket(2222);
System.out.println("服务器主线程等待连接....");
AtomicInteger i = new AtomicInteger(0);
Socket client;
while ((client = serverSocket.accept()) != null) {
i.incrementAndGet();
Socket finalClient = client;
new Thread(new Runnable() {
@Override
public void run() {
try {
byte[] bytes = new byte["hello".getBytes().length];
while (finalClient.getInputStream().read(bytes) != -1) {
finalClient.getOutputStream().write(bytes);
finalClient.close();
return;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}, "服务器线程" + i.get()).start();
}
}
}
public class BIO {
public static void main(String[] args) throws IOException, InterruptedException {
new Thread(new Runnable() {
@Override
public void run() {
try {
new BIOServer().initServer();
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
Thread.sleep(1000);
new Client().beginTest(2222);
}
}
三、NIO
1、概述
NIO(Non-block input output),非阻塞模式IO。NIO主要有三大核心部分:Channel(通道),Buffer(缓冲区), Selector(选择器)。
传统IO基于流进行操作的,而NIO基于Channel和Buffer进行操作,是面向缓存的,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择区)用于监听多个通道的事件(比如:连接打开,数据到达),当监听事件发生时,才去唤醒线程处理请求,而不必阻塞于外部的输入。
下面是BIO的处理流程,可以看到每个线程都阻塞在socket的读写,而不能去处理其他事情,白白浪费了CPU资源。

下面是NIO的处理流程,NIO区别于BIO的关键步骤在于有一个专门的Selector线程轮询socket发来的有效数据,然后将其交给相应的线程进行处理,这里少了一个等待客户端发生数据的时间。

不做深究,点到为止,这里给一篇链接,算是讲得挺好的。
2、示例代码
public class NIOServer {
private Selector selector;
public void initServer(int port) throws IOException{
// 打开ServerSocket通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(port));
// 获取一个选择器
this.selector = Selector.open();
// 将通道管理器与该通道进行绑定,并为该通道注册SelectionKey.OP_ACCEPT事件
// 注册事件后,当该事件触发时会使selector.select()返回,
// 否则selector.select()一直阻塞
serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
listen();
}
public void listen() throws IOException{
System.out.println("启动服务器!");
while (true) {
// select()方法一直阻塞直到有注册的通道准备好了才会返回
selector.select();
Iterator<?> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = (SelectionKey) iterator.next();
// 删除已选的key,防止重复处理
iterator.remove();
handler(key);
}
}
}
public void handler(SelectionKey key)throws IOException{
if (key.isAcceptable()) {
handlerAccept(key);
}else if (key.isReadable()){
handlerRead(key);
}else if (key.isWritable()){
System.out.println("can write!");
}else if (key.isConnectable()){
System.out.println("is connectable");
}
}
public void handlerAccept(SelectionKey key) throws IOException{
// 从SelectionKey中获取ServerSocketChannel
ServerSocketChannel server = (ServerSocketChannel) key.channel();
// 获取SocketChannel
SocketChannel socketChannel = server.accept();
// 设置成非阻塞
socketChannel.configureBlocking(false);
// 为socketChannel通道建立 OP_READ 读操作,使客户端发送的内容可以被读到
socketChannel.register(selector, SelectionKey.OP_READ);
}
public void handlerRead(SelectionKey key)throws IOException{
SocketChannel socketChannel = (SocketChannel) key.channel();
// 创建读取缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(512);
// 从通道读取可读取的字节数
try {
int readCount = socketChannel.read(byteBuffer);
if (readCount > 0) {
byte[] data = byteBuffer.array();
ByteBuffer outBuffer = ByteBuffer.wrap(data);
socketChannel.write(outBuffer);
socketChannel.close();
} else {
System.out.println("客户端异常退出");
}
} catch (IOException e) {
key.cancel();
}
}
}
public class NIO {
public static void main(String[] args) throws IOException, InterruptedException {
new Thread(new Runnable() {
@Override
public void run() {
try {
new NIOServer().initServer(4444);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
Thread.sleep(1000);
new Client().beginTest(4444);
}
}
四、AIO
AIO(Asynchronous Input Output),异步非阻塞IO模式。AIO的核心机制类似Future(CompletableFuture)机制,不同的是方法回调是由操作系统把控的,当操作系统完成读/写IO操作时,会回调Java线程。
示例代码如下
public class AIOServer {
private static final ExecutorService executorService = Executors.newFixedThreadPool(200);
public void init() throws Exception {
AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(executorService);
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group);
server.bind(new InetSocketAddress(3333));
server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(AsynchronousSocketChannel client, Object attachment) {
server.accept(null, this);
// try {
// System.out.println(Thread.currentThread().getName() + ":服务器与客户端" + client.getRemoteAddress() + "建立连接");
// } catch (IOException e) {
// e.printStackTrace();
// }
ByteBuffer buffer = ByteBuffer.allocate("hello".getBytes().length);
client.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer index, ByteBuffer buffer) {
try {
buffer.flip();
client.write(buffer).get();//这个是异步的,一定要用get 确保执行结束 才能clear
client.close();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println(exc.getMessage());
}
});
}
@Override
public void failed(Throwable exc, Object attachment) {
throw new RuntimeException(exc.getMessage());
}
});
}
}
public class AIO {
public static void main(String[] args) throws Exception {
new Thread(new Runnable() {
@Override
public void run() {
try {
new AIOServer().init();
} catch (Exception e) {
e.printStackTrace();
return;
}
}
}, "服务端线程").start();
Thread.sleep(1000);
if (args.length != 0) {
new Client(Integer.parseInt(args[0])).beginTest(3333);
} else {
new Client().beginTest(3333);
}
}
}
五、性能比较
1、bash脚本测试
| 10线程 | 50线程 | 100线程 | 500线程 | 1000线程 | |
|---|---|---|---|---|---|
| BIO | 10 | 23、19、26、25、14、20、16、20、23、27 | 21.3ms | ||
| NIO | 10 | 16、20、25、21、25、21、19、22、21、15 | 20.5ms | ||
| AIO | 10 | ||||
| BIO | 100 | ||||
| NIO | 100 | ||||
| AIO | 100 | ||||
| BIO | 500 | ||||
| NIO | 500 | ||||
| AIO | 500 | ||||
| BIO | 1000 | ||||
| NIO | 1000 | ||||
| AIO | 1000 | ||||
| BIO | 2000 | ||||
| NIO | 2000 | ||||
| AIO | 2000 | ||||
| BIO | 3000 | ||||
| NIO | 3000 | ||||
| AIO | 3000 | ||||
| BIO | 5000 | ||||
| NIO | 5000 | ||||
| AIO | 5000 |
一般般帅