Unix 网络编程提供了五种 I/O 模型,分别是:
- 阻塞 I/O
- 非阻塞 I/O
- I/O 多路复用
- 信号驱动 I/O
- 异步 I/O
其中后两者的区别在于信号驱动 I/O 是内核通知用户进程何时开始进行 I/O 操作,异步 I/O 是内核通知用户进程何时 I/O 操作已经完成。本文将介绍 Java 在此基础上实现的网络 I/O 模型。
BIO 同步阻塞 I/O
BIO 模型的服务器会采用一个 Acceptor 来负责监听客户端的连接,收到客户端的连接请求后会创建一个新的 handler 线程负责处理之后的读写操作,之后 handler 线程将相应返回给客户端。
JDK 自带的Socket
和ServerSocket
都只能进行同步阻塞 I/O。
这种模型最明显的弊端在于请求与线程的比例为 1:1,在请求量很高的情况下,系统会创建大量线程以相应客户端请求,可能导致线程堆栈逸出、创建线程失败等问题,最终发生宕机。
伪异步 I/O
伪异步 I/O 在 BIO 的基础上进行了优化,它会提前创建线程池,Acceptor 接收到客户端请求后会将请求封装为一个 Runable 的对象交给线程池处理,如果请求数量超过了线程池活跃线程的数量,超过部分的请求会被放在线程池的任务队列里,待线程处理完之前的请求后消费任务队列里的请求。请求数量可以远大于线程数。
这种模型虽然解决了 BIO 一请求一线程的弊端,但由于其线程处理的过程仍然是同步的,一但处理请求的过程中被阻塞,阻塞的这段时间线程是无法响应其他请求的。一但线程池里的线程都被阻塞,整个线程池的请求会越堆越多,最终超过队列大小,服务器拒绝请求引发连锁反应。
Rector 模式
在介绍剩下的 Java I/O 模型之前,先来了解一下 Reactor 模式的一些概念。Reactor 模式又称作 Dispatcher 模式,采用 I/O 多路复用技术来统一监听事件,收到相应的事件之后分配给某个对象去处理。
Reactor 模式下有下面三种角色:
- Reactor:监听事件并将对应时间分配给相应对象(如果是新连接会交给 Acceptor,如果是其他事件会交给 Acceptor 中创建的 Handler)
- Acceptor:处理新连接,创建新的 Handler 去处理连接之后的动作
- Handler:完成 read -> 业务处理 -> write 的业务流程。
根据 Reactor 和服务线程数量不同,Reactor 模式 可以分为下面几部分。
单 Reactor 单线程
这个模式下的处理流程如下:
- Reactor 通过 select/epoll 监控事件,收到事件后进行分发(一般会实现
dispatch()
方法) - 如果是建立连接会交给 Acceptor 处理,Acceptor 会进行
accept()
,并创建 Handler 处理之后的事件 - 如果不是建立连接,则会直接调用对应的 Handler 处理,完成
read()
-> 业务处理 ->write()
的操作
整个服务只存在一个线程,即支持 Reactor 进行事件调度,又支持 Handler 进行业务处理。
这个模式的缺点很明显,在 Hanlder 进行业务处理的过程中,整个服务器无法处理其他已连接的事件,很容易造成性能瓶颈。因此,这种模式只适用于业务处理很快的场景,如 redis。
单 Reactor 多线程
相对于单线程,多线程增加了线程池,它会将单线程中 Handler 里的业务处理的职能交给线程池里的 Processor 线程来做,Processor 会将业务处理的结果返回给 Handler,Handler 只进行 read 和 write。
这样,业务处理就不会阻塞服务响应其他请求了。但这种模式还有一个缺点,Reactor 承担所有时间的监听和响应,在高并发时容易成为性能瓶颈。
多 Reactor 多线程
相对于单 Reactor,多 Reactor 模式增加了多个(一般与CPU核数相等)的 SubReactor,带来的区别有:
- MainReactor 只负责监听连接事件
- Acceptor 在
accept()
之后会将 socket 注册到一个 SubReactor 的 select/epoll 上 - SubReactor 继续监听已连接 socket 的其他事件,并交给 Handler 处理
- Handler 依然可以用线程池来进行业务处理。
这样一来,事件监听的任务被分散到多个线程的 Reactor 上,带来很大的性能提升。netty 就是这种模式。
NIO 非阻塞 I/O
NIO 是同步非阻塞 I/O,在 JDK 1.4 中引入了 NIO 类库,引入了一些新的类:
- Buffer:缓冲区,所有的读写操作都是在 Buffer 上进行的,提供了数据的结构化访问和读写位置的维护(limit)
- Channel:全双工读写流,包括用于网络读写的
SelectableChannel
和用于文件操作的FileChannel
,常见的SocketChannel
和ServerSocketChannel
都是SelectableChannel
的子类 - Selector:会不断轮训注册在其上的 Channel,一但 Channel 上有了新的事件,Selector 会把它轮训出来。通过 SelectionKey 可以获得就绪的 Channel 集合。底层用 epoll 实现。
下面是用 NIO 实现的简单的 TCPServer,可以发现 NIO 相当于简单的单 Reactor 单线程模式。
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;
public class SimpleNIOServer implements Runnable{
private Selector selector;
private ServerSocketChannel channel;
private volatile Boolean stop = false;
public SimpleNIOServer() {
try {
selector = Selector.open();
channel = ServerSocketChannel.open();
channel.configureBlocking(false);
channel.socket().bind(new InetSocketAddress(5000), 1024);
channel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("Server start ...");
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
while(!stop) {
try {
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
try {
// dispatch
dispatch(key);
iterator.remove();
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null) key.channel().close();
}
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
// 关闭 selector 之后其余注册在其上的组件将会自动解除注册并关闭
if (selector != null) {
try {
selector.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void dispatch(SelectionKey key) throws IOException {
if (key.isValid()) {
// accept
if (key.isAcceptable()) {
ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
SocketChannel socketChannel = ssc.accept();
System.out.println("Connected With " + socketChannel.getRemoteAddress().toString());
socketChannel.configureBlocking(false);
// 相当于创建 Handler
socketChannel.register(selector, SelectionKey.OP_READ);
}
// read
if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
// 由于设置了非阻塞,read 调用会立即返回
int readBytes = socketChannel.read(readBuffer);
if (readBytes>0) {
// 将 position 设置为初始位置,将 limit 设置为 position
readBuffer.flip();
// 获取 position 和 limit 之间的字节
byte[] bytes = new byte[readBuffer.remaining()];
String body = new String(bytes, StandardCharsets.UTF_8);
System.out.println("Server received: " + body);
response(socketChannel);
}
else if (readBytes < 0) {
key.cancel();
socketChannel.close();
}
else {
;
}
}
}
}
private void response(SocketChannel channel) throws IOException {
byte[] bytes = "Hello World".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer);
}
public static void main(String[] args) {
SimpleNIOServer server = new SimpleNIOServer();
new Thread(server).start();
}
}
AIO 异步 I/O
AIO 是真正意义上的异步 I/O,它采用回调机制,在进行异步 I/O 操作的时候传入一个 CompletionHandler 对象用于回调, 由 JDK 底层的线程池负责回调并驱动读写操作。
由 AIO 实现的 TCPServer:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;
public class SimpleAIOServer implements Runnable{
private AsynchronousServerSocketChannel serverSocketChannel;
public SimpleAIOServer() {
try {
serverSocketChannel = AsynchronousServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(5000), 1024);
System.out.println("Start server ...");
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
doAccept();
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void doAccept() {
serverSocketChannel.accept(this, new CompletionHandler<AsynchronousSocketChannel, SimpleAIOServer>() {
@Override
public void completed(AsynchronousSocketChannel result, SimpleAIOServer attachment) {
attachment.serverSocketChannel.accept(attachment, this);
System.out.println("Connected ...");
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
result.read(readBuffer, readBuffer, new ReadCompletionHandler(result));
}
@Override
public void failed(Throwable exc, SimpleAIOServer attachment) {
}
});
}
public static void main(String[] args) {
SimpleAIOServer server = new SimpleAIOServer();
server.run();
}
}
class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel channel;
public ReadCompletionHandler(AsynchronousSocketChannel channel) {
this.channel = channel;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
byte[] body = new byte[attachment.remaining()];
attachment.get(body);
try {
System.out.println("Received: " + new String(body, StandardCharsets.UTF_8));
doWrite();
}
catch (Exception e) {
e.printStackTrace();
}
}
private void doWrite() {
byte[] res = "Hello World".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(res.length);
writeBuffer.put(res);
writeBuffer.flip();
channel.write(writeBuffer);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
}
}
Netty
Netty 是标准的多 Reactor 多线程模型,封装了一些底层的细节,下面是用 Netty 实现的简单的 TCPServer,没有用线程池来处理业务请求。
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class SimpleNettyServer {
public void bind() {
// 两个线程组,分别处理 accept 和 read/write,对应 reactor 里的 MainReactor 和 SubReactor
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap boot = new ServerBootstrap();
boot.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
// childHandler I/O事件的处理类,对应 reactor 中的 handler
.childHandler(new ChildHandler());
try {
ChannelFuture future = boot.bind(5000).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
SimpleNettyServer simpleNettyServer = new SimpleNettyServer();
simpleNettyServer.bind();
}
}
// 网络事件处理类
class ChildHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ServerHandler());
}
}
// 对网络事件的读写类
class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// netty 的 ByteBuf 对象
ByteBuf buf = (ByteBuf) msg;
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
System.out.println("Received: " + new String(bytes));
ByteBuf resp = Unpooled.copiedBuffer("Hello World!".getBytes());
ctx.write(resp);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// netty 的 write 并没有直接写入 SocketChannel,而是放在缓冲数组里
// 将消息发送队列里的消息写入到 SocketChannel 里发送给对方
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
}
(End)
参考资料
- https://juejin.im/post/5b4570cce51d451984695a9b
- https://segmentfault.com/a/1190000016072310