awakeBird Back-end Dev Engineer

Java 网络 I/O 模型

2019-02-28

Unix 网络编程提供了五种 I/O 模型,分别是:

  • 阻塞 I/O
  • 非阻塞 I/O
  • I/O 多路复用
  • 信号驱动 I/O
  • 异步 I/O

其中后两者的区别在于信号驱动 I/O 是内核通知用户进程何时开始进行 I/O 操作,异步 I/O 是内核通知用户进程何时 I/O 操作已经完成。本文将介绍 Java 在此基础上实现的网络 I/O 模型。

BIO 同步阻塞 I/O

BIO 模型的服务器会采用一个 Acceptor 来负责监听客户端的连接,收到客户端的连接请求后会创建一个新的 handler 线程负责处理之后的读写操作,之后 handler 线程将相应返回给客户端。

JDK 自带的SocketServerSocket都只能进行同步阻塞 I/O。

这种模型最明显的弊端在于请求与线程的比例为 1:1,在请求量很高的情况下,系统会创建大量线程以相应客户端请求,可能导致线程堆栈逸出、创建线程失败等问题,最终发生宕机。

伪异步 I/O

伪异步 I/O 在 BIO 的基础上进行了优化,它会提前创建线程池,Acceptor 接收到客户端请求后会将请求封装为一个 Runable 的对象交给线程池处理,如果请求数量超过了线程池活跃线程的数量,超过部分的请求会被放在线程池的任务队列里,待线程处理完之前的请求后消费任务队列里的请求。请求数量可以远大于线程数。

这种模型虽然解决了 BIO 一请求一线程的弊端,但由于其线程处理的过程仍然是同步的,一但处理请求的过程中被阻塞,阻塞的这段时间线程是无法响应其他请求的。一但线程池里的线程都被阻塞,整个线程池的请求会越堆越多,最终超过队列大小,服务器拒绝请求引发连锁反应。

Rector 模式

在介绍剩下的 Java I/O 模型之前,先来了解一下 Reactor 模式的一些概念。Reactor 模式又称作 Dispatcher 模式,采用 I/O 多路复用技术来统一监听事件,收到相应的事件之后分配给某个对象去处理

Reactor 模式下有下面三种角色:

  • Reactor:监听事件并将对应时间分配给相应对象(如果是新连接会交给 Acceptor,如果是其他事件会交给 Acceptor 中创建的 Handler)
  • Acceptor:处理新连接,创建新的 Handler 去处理连接之后的动作
  • Handler:完成 read -> 业务处理 -> write 的业务流程。

根据 Reactor 和服务线程数量不同,Reactor 模式 可以分为下面几部分。

单 Reactor 单线程

这个模式下的处理流程如下:

  • Reactor 通过 select/epoll 监控事件,收到事件后进行分发(一般会实现dispatch()方法)
  • 如果是建立连接会交给 Acceptor 处理,Acceptor 会进行accept(),并创建 Handler 处理之后的事件
  • 如果不是建立连接,则会直接调用对应的 Handler 处理,完成read() -> 业务处理 -> write() 的操作

整个服务只存在一个线程,即支持 Reactor 进行事件调度,又支持 Handler 进行业务处理。

这个模式的缺点很明显,在 Hanlder 进行业务处理的过程中,整个服务器无法处理其他已连接的事件,很容易造成性能瓶颈。因此,这种模式只适用于业务处理很快的场景,如 redis。

单 Reactor 多线程

相对于单线程,多线程增加了线程池,它会将单线程中 Handler 里的业务处理的职能交给线程池里的 Processor 线程来做,Processor 会将业务处理的结果返回给 Handler,Handler 只进行 read 和 write。

这样,业务处理就不会阻塞服务响应其他请求了。但这种模式还有一个缺点,Reactor 承担所有时间的监听和响应,在高并发时容易成为性能瓶颈。

多 Reactor 多线程

相对于单 Reactor,多 Reactor 模式增加了多个(一般与CPU核数相等)的 SubReactor,带来的区别有:

  • MainReactor 只负责监听连接事件
  • Acceptor 在accept()之后会将 socket 注册到一个 SubReactor 的 select/epoll 上
  • SubReactor 继续监听已连接 socket 的其他事件,并交给 Handler 处理
  • Handler 依然可以用线程池来进行业务处理。

这样一来,事件监听的任务被分散到多个线程的 Reactor 上,带来很大的性能提升。netty 就是这种模式。

NIO 非阻塞 I/O

NIO 是同步非阻塞 I/O,在 JDK 1.4 中引入了 NIO 类库,引入了一些新的类:

  • Buffer:缓冲区,所有的读写操作都是在 Buffer 上进行的,提供了数据的结构化访问和读写位置的维护(limit)
  • Channel:全双工读写流,包括用于网络读写的SelectableChannel和用于文件操作的FileChannel,常见的SocketChannelServerSocketChannel都是SelectableChannel的子类
  • Selector:会不断轮训注册在其上的 Channel,一但 Channel 上有了新的事件,Selector 会把它轮训出来。通过 SelectionKey 可以获得就绪的 Channel 集合。底层用 epoll 实现。

下面是用 NIO 实现的简单的 TCPServer,可以发现 NIO 相当于简单的单 Reactor 单线程模式。

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;

public class SimpleNIOServer implements Runnable{

    private Selector selector;
    private ServerSocketChannel channel;
    private volatile Boolean stop = false;

    public SimpleNIOServer() {
        try {
            selector = Selector.open();
            channel = ServerSocketChannel.open();
            channel.configureBlocking(false);
            channel.socket().bind(new InetSocketAddress(5000), 1024);
            channel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("Server start ...");
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    @Override
    public void run() {
        while(!stop) {
            try {
                selector.select(1000);
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    try {
                        // dispatch
                        dispatch(key);
                        iterator.remove();
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null) key.channel().close();
                        }
                        e.printStackTrace();
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }

        }

        // 关闭 selector 之后其余注册在其上的组件将会自动解除注册并关闭
        if (selector != null) {
            try {
                selector.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private void dispatch(SelectionKey key) throws IOException {
        if (key.isValid()) {
            // accept
            if (key.isAcceptable()) {

                ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
                SocketChannel socketChannel = ssc.accept();
                System.out.println("Connected With " + socketChannel.getRemoteAddress().toString());
                socketChannel.configureBlocking(false);
                // 相当于创建 Handler
                socketChannel.register(selector, SelectionKey.OP_READ);
            }

            // read
            if (key.isReadable()) {
                SocketChannel socketChannel = (SocketChannel) key.channel();
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                // 由于设置了非阻塞,read 调用会立即返回
                int readBytes = socketChannel.read(readBuffer);
                if (readBytes>0) {
                    // 将 position 设置为初始位置,将 limit 设置为 position
                    readBuffer.flip();
                    // 获取 position 和 limit 之间的字节
                    byte[] bytes = new byte[readBuffer.remaining()];
                    String body = new String(bytes, StandardCharsets.UTF_8);
                    System.out.println("Server received: " + body);
                    response(socketChannel);
                }
                else if (readBytes < 0) {
                    key.cancel();
                    socketChannel.close();
                }
                else {
                    ;
                }

            }
        }
    }

    private void response(SocketChannel channel) throws IOException {
        byte[] bytes = "Hello World".getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
        writeBuffer.put(bytes);
        writeBuffer.flip();
        channel.write(writeBuffer);
    }

    public static void main(String[] args) {
        SimpleNIOServer server = new SimpleNIOServer();
        new Thread(server).start();
    }
}

AIO 异步 I/O

AIO 是真正意义上的异步 I/O,它采用回调机制,在进行异步 I/O 操作的时候传入一个 CompletionHandler 对象用于回调, 由 JDK 底层的线程池负责回调并驱动读写操作。

由 AIO 实现的 TCPServer:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;

public class SimpleAIOServer implements Runnable{

    private AsynchronousServerSocketChannel serverSocketChannel;

    public SimpleAIOServer() {
        try {
            serverSocketChannel = AsynchronousServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(5000), 1024);
            System.out.println("Start server ...");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        doAccept();

        try {
            Thread.sleep(Integer.MAX_VALUE);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    private void doAccept() {
        serverSocketChannel.accept(this, new CompletionHandler<AsynchronousSocketChannel, SimpleAIOServer>() {
            @Override
            public void completed(AsynchronousSocketChannel result, SimpleAIOServer attachment) {
                attachment.serverSocketChannel.accept(attachment, this);
                System.out.println("Connected ...");
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                result.read(readBuffer, readBuffer, new ReadCompletionHandler(result));
            }

            @Override
            public void failed(Throwable exc, SimpleAIOServer attachment) {

            }
        });
    }

    public static void main(String[] args) {
        SimpleAIOServer server = new SimpleAIOServer();
        server.run();
    }
}

class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {

    private AsynchronousSocketChannel channel;

    public ReadCompletionHandler(AsynchronousSocketChannel channel) {
        this.channel = channel;
    }

    @Override
    public void completed(Integer result, ByteBuffer attachment) {
        attachment.flip();
        byte[] body = new byte[attachment.remaining()];
        attachment.get(body);
        try {
            System.out.println("Received: " + new String(body, StandardCharsets.UTF_8));
            doWrite();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void doWrite() {
        byte[] res = "Hello World".getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(res.length);
        writeBuffer.put(res);
        writeBuffer.flip();
        channel.write(writeBuffer);
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {

    }
}

Netty

Netty 是标准的多 Reactor 多线程模型,封装了一些底层的细节,下面是用 Netty 实现的简单的 TCPServer,没有用线程池来处理业务请求。

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class SimpleNettyServer {

    public void bind() {

        // 两个线程组,分别处理 accept 和 read/write,对应 reactor 里的 MainReactor 和 SubReactor
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        ServerBootstrap boot = new ServerBootstrap();
        boot.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                // childHandler I/O事件的处理类,对应 reactor 中的 handler
                .childHandler(new ChildHandler());

        try {
            ChannelFuture future = boot.bind(5000).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

    public static void main(String[] args) {
        SimpleNettyServer simpleNettyServer = new SimpleNettyServer();
        simpleNettyServer.bind();
    }
}

// 网络事件处理类
class ChildHandler extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new ServerHandler());
    }
}

// 对网络事件的读写类
class ServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // netty 的 ByteBuf 对象
        ByteBuf buf = (ByteBuf) msg;
        byte[] bytes = new byte[buf.readableBytes()];
        buf.readBytes(bytes);
        System.out.println("Received: " + new String(bytes));
        ByteBuf resp = Unpooled.copiedBuffer("Hello World!".getBytes());
        ctx.write(resp);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // netty 的 write 并没有直接写入 SocketChannel,而是放在缓冲数组里
        // 将消息发送队列里的消息写入到 SocketChannel 里发送给对方
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        ctx.close();
    }
}

(End)

参考资料

  • https://juejin.im/post/5b4570cce51d451984695a9b
  • https://segmentfault.com/a/1190000016072310

Similar Posts

下一篇 Spring(二)AOP

Comments