# 简介

关于多路复用(windows 下使用 select)的简单介绍已经在 Unix IO 模型 NIO 基础详解中大致提到,并给出了代码实现。本文会对相关方法和模型进行详细讲解。

多路复用 IO 技术最适用的是 “高并发” 场景,所谓高并发是指 1 毫秒内至少同时有上千个连接请求准备好

try(ServerSocket server = new ServerSocket(8888)) {
    Socket s = server.accept();

    System.out.println("服务端收到连接");

    BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream()));

    System.out.println("服务端读取数据: " + reader.readLine());

    OutputStreamWriter write = new OutputStreamWriter(s.getOutputStream());

    write.write("服务端已收到数据");
    write.flush();
}
try(Socket s = new Socket("localhost",8888)) {
    OutputStreamWriter write = new OutputStreamWriter(s.getOutputStream());
	
    write.write("客户端发送数据:xyz\n");
    write.flush();
    System.out.println("客户端发送数据");
    BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream()));
    System.out.println("客户端收到数据: "+reader.readLine());
}

# 模型

# 传统 IO 模型

一个 Server 对接 N 个客户端,在客户端连接之后,为每个客户端都分配一个执行线程。

传统IO模型

从图中可以看出其特点:

  • 每个客户端连接到达之后,服务端会分配一个线程给该客户端,该线程会处理包括读取数据,解码,业务计算,编码,以及发送数据整个过程;
  • 同一时刻,服务端的吞吐量与服务器所提供的线程数量是呈线性关系的。

但是这也有很大问题:

  • 服务器并发量严重依赖于服务端能创建的线程数。

  • 服务端在获取客户端连接,读取数据,以及写入数据的过程都是阻塞类型的,在网络状况不好的情况下,这将极大的降低服务器每个线程的利用率,从而降低服务器吞吐量。

如果每一个客户端都维持一个与登陆服务器的连接。那么服务器将维护多个和客户端的连接以出来和客户端的 contnect 、read、write ,特别是对于长链接的服务,有多少个 c 端,就需要在 s 端维护同等的 IO 连接。这对服务器来说是一个很大的开销。

代码实现

// 服务端 -- 其实这里应该写死循环的
public class Server {
    public static void main(String[] args) throws IOException {
        try(ServerSocket server = new ServerSocket(8888)) {
            Socket s = server.accept();
            System.out.println("服务端收到连接");
            BufferedReader reader = new BufferedReader(
                new InputStreamReader(s.getInputStream()));
            System.out.println("服务端读取数据: " + reader.readLine());
            OutputStreamWriter write = new OutputStreamWriter(s.getOutputStream());
            // 两边接收数据都是 readLine,所以必须加换行符
            write.write("服务端已收到数据\n");
            write.flush();
        }
    }
}
// 客户端
public class NIOClient {
    public static void main(String[] args) throws IOException {
        try(Socket s = new Socket("localhost",8888)) {
            OutputStreamWriter write = new OutputStreamWriter(s.getOutputStream());
            // 两边接收数据都是 readLine,所以必须加换行符
            write.write("客户端发送数据:xyz\n");
            write.flush();
            System.out.println("客户端发送数据");
            BufferedReader reader = new BufferedReader(
                new InputStreamReader(s.getInputStream()));
            System.out.println("客户端收到数据: "+reader.readLine());
        }
    }
}

你也可以试着使用通道和缓冲区实现,参考链接

# I/O 多路复用

之前讲了 I/O 多路复用的模型介绍大概,这里我们给出源码实现

public static void main(String[] args) {
    try (ServerSocketChannel serverChannel = ServerSocketChannel.open();
         Selector selector = Selector.open()){   // 开启一个新的 Selector,这玩意也是要关闭释放资源的
        serverChannel.bind(new InetSocketAddress(8080));
        // 要使用选择器进行操作,必须使用非阻塞的方式,这样才不会像阻塞 IO 那样卡在 accept (),
        // 而是直接通过,让选择器去进行下一步操作
        serverChannel.configureBlocking(false);
        
        // 将 ServerChannel 注册到选择器上,此时只有选择器只会监听这一个通道
        // 因为是 ServerSocketChannel 这里我们就监听 accept 就可以了,等待客户端连接
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        
        while (true) {   // 无限循环等待新的用户网络操作
            // 每次选择都可能会选出多个已经就绪的网络操作,没有操作时会暂时阻塞
            int count = selector.select();
            System.out.println("监听到 "+count+" 个事件");
            // 如果是第一次循环,拿到的事件 SelectionKey 绑定的通道都是 ServerChannel
            
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                // 根据不同的事件类型,执行不同的操作即可
                
                // 该事件绑定的通道就是 ServerChannel
                if(key.isAcceptable()) {  // 如果 ServerSocketChannel 已经做好准备处理 Accept
                    
                    // 我们现在知道了有一个客户端需要连接(之后可能会发送数据给服务端)
                    // 所以我们将客户端的通道初始化
                    SocketChannel channel = serverChannel.accept();
                    System.out.println("客户端已连接,IP地址为:"+channel.getRemoteAddress());
                    // 现在连接就建立好了,接着我们需要将连接也注册选择器
                    channel.configureBlocking(false);
                    channel.register(selector, SelectionKey.OP_READ);
                    // 这样就在连接建立时完成了注册
                } else if(key.isReadable()) {    // 如果当前连接有可读的数据并且可以写,那么就开始处理
                    SocketChannel channel = (SocketChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(128);
                    channel.read(buffer);
                    buffer.flip();
                    System.out.println("接收到客户端数据:"+new String(buffer.array(), 0, buffer.remaining()));
                    // 直接向通道中写入数据就行
                    channel.write(ByteBuffer.wrap("已收到!".getBytes()));
                    // 别关,说不定用户还要继续通信呢
                }
                // 处理完成后,一定记得移出迭代器,不然下次还有
                iterator.remove();
            }
        }
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

上面是关于服务器的代码,最主要的就是 SelectionKey ,它包含了监听事件注册时的通道,比如一开始我们将通道 ServerSocketChannel 注册到选择器,然后在 if 选择语句中拿到的 key 就包含 ServerSocket

SelectionKey 有一个 channel() 方法,使用时需要强转一下。因为第一个 if 已经有了 ServerSocketChannel 的引用,就没必要调用 cahnel()

之后的客户端代码就比较简单了

public static void main(String[] args) {
    // 创建一个新的 SocketChannel,通过通道进行通信
    try (SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", 8080));
         Scanner scanner = new Scanner(System.in)){
        System.out.println("已连接到服务端!");
        while (true) {   // 咱给它套个无限循环,这样就能一直发消息了
            System.out.println("请输入要发送给服务端的内容:");
            String text = scanner.nextLine();
            // 直接向通道中写入数据,真舒服
            channel.write(ByteBuffer.wrap(text.getBytes()));
            System.out.println("已发送!");
            ByteBuffer buffer = ByteBuffer.allocate(128);
            channel.read(buffer);   // 直接从通道中读取数据
            buffer.flip();
            System.out.println("收到服务器返回:"+new String(buffer.array(), 0, buffer.remaining()));
        }
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

请读者将这部分代码弄清楚,不清楚就去查,下面给大家几篇写的比较好的博客

田守枝 Java 技术博客(讲解细致,建议收藏):http://www.tianshouzhi.com/api/tutorials/netty/318

并发编程网(很短,推荐看一下):http://ifeve.com/server-socket-channel/

CSDN(这篇博客我只看了前面几句话,提到了我忽略的点):https://blog.csdn.net/cold___play/article/details/106663776

但是这个也存在问题:比如一个线程忙不过来(真实),所以之后的 Reactor 模型会使用到线程池。

# 单线程 Reactor 模型

Reactor 模型主要是对服务器进行优化,首先我们抽象出两个组件

  • Reactor 线程:负责响应 IO 事件,并分发到 Handler 处理器。新的事件包含连接建立就绪,读就绪,写就绪等。
  • Handler 处理器:执行非阻塞的操作(因为 Selector 只有监听到了真正要执行的事件,才会将该事件交给 Handler 执行,所以不存在阻塞操作)。

图片源自青空の霞光

JDK1.4 提供了一套非阻塞 IOAPI ,本质是以事件驱动来处理网络事件的, Reactor 是基于该 API 提出的一套 IO 模型。

该模型主要分为四个部分:客户端连接,Reactor,Acceptor,Handler。

Reactor 模型是以事件进行驱动的(应该已经很了解事件驱动了吧),其能够将接收客户端连接,+ 网络读和网络写,以及业务计算进行拆分,从而极大的提升处理效率;Reactor 模型是异步非阻塞模型,工作线程在没有网络事件时可以处理其他的任务,而不用像传统 IO 那样必须阻塞等待。

Acceptor 处理客户端新连接,并分派请求到处理器链( Reactor

下图是比较正经(难懂)的图。

Reactor模型

我们先看一下 Handler 代码

// 执行非阻塞操作,请记住,read 在这里也是非阻塞
public class Handler implements Runnable{
    private final SocketChannel channel;
    public Handler(SocketChannel channel) {
        this.channel = channel;
    }
    @Override
    public void run() {
        try {
            ByteBuffer buffer = ByteBuffer.allocate(128);
            channel.read(buffer);
            buffer.flip();
            System.out.println("接收到客户端数据:"+
                               new String(buffer.array(), 0, buffer.remaining()));
            channel.write(ByteBuffer.wrap("已收到!".getBytes()));
        }catch (IOException e){
            e.printStackTrace();
        }
    }
}

接着是 Acceptor ,非常快就能看完。

public class Acceptor implements Runnable {
    private final ServerSocketChannel serverChannel;
    private final Selector selector;
    
    public Acceptor(ServerSocketChannel serverChannel,Selector selector) {
        this.serverChannel = serverChannel;
        this.selector = selector;
    }
    
    @Override
    public void run() {
        try {
            SocketChannel channel = serverChannel.accept();
            System.out.println("客户端已连接,IP地址为:" + 
                              channel.getRemoteAddress());
            channel.configureBlocking(false);
            
            // 注册时,创建好对应的 Handler,这样在 Reactor 中分发的时候就可以直接调用 Handler
            channel.register(selector,Selector.OP_READ,new Handler(channel));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

最后看一下 Reactor 的实现,这里的 dispatch 使用了 attachment 方法

选择键支持将单个任意对象附加到某个键的操作。可通过 attach() 方法附加对象,然后通过 attachment() 方法获取该对象。我们在 Acceptor 和 Handler 都没有使用 attach 方法,而是在注册时绑定 SelectionKey key=channel.register(selector,SelectionKey.OP_READ,theObject);

public class Reactor implements Closeable,Runnable {
    
    private final ServerSocketChannel serverChannel;
    private final Selector selector;
    
    public Reactor() throws IOException {
        serverChannel = ServerSocketChannel.open();
        selector = Selector.open();
    }
    
    @Override
    Public void run() {
        try {
            serverChannel.bind(new InetSocketAddress(8888));
            serverChannel.configureBlocking(false);
            
            // 从这里可以看到,Reactor 并不处理连接,而是将需要连接的
            // 事件交给 acceptor 处理
            serverChannel.register(selector,SelectionKey.OP_ACCEP,
                                  new Acceptor(serverChannel,selector));
            
            while(true) {
                int count = selector.select();
                System.out.println("监听到 " + count + " 个事件");
                Set<SelectionKey> set = selector.selectionKeys();
                Iterator<SelectionKey> iterator = set.iterator();
                
                while(iterator.hasnext()) {
                    // 通过 dispatch 方法分发出去
                    this.dispatch(iterator.next());
                    iterator.remove();
                }
            }
        }
    }
    
    private void dispatch(selectionKey key) {
        Object att  = key.attachment();
        if(att instanceof Runnable) {
            // Handler 和 Acceptor 都实现自 Runnable 接口,这里就统一调用一下
            ((Runnable) att).run();
        }
    }
    
    @Override
    public void close() throws IOException {
        serverChannel.close();
        selector.close();
    }
     
}

需要注意的是,这是单线程模式,也就是说,Reactor 和 Hander 都是处于一条线程执行。由于是单线程,只要有一个 handler 出现阻塞,其他的 Client 都会被阻塞,并且不能充分利用多核资源。所以单线程模型仅仅适用于 handler 中业务处理组件能快速完成的场景。

所以整个流程图也可以这么画(其实这个图画的不准确, client 应该指向 Reactor ,而且此处也省略了 dispatch )。

Reactor单线程模型

# 多线程 Reactor 模型

先看一下青空の霞光给的图

相关改进为:

  • Handler 处理器的执行放入线程池,多线程进行业务处理。此时 handler 处理的是非阻塞任务。所以这个模型又叫做:业务处理与 IO 分离
  • 对于 Reactor 而言,仍然是单线程。

Reactor多线程模型

特点:

  • 有一个专门的 NIO 线程用于监听服务器,接收客户端的 TCP 连接请求以及网络读写事件的处理(可以看到 readsend 仍然是 Reactor 在处理)。

  • 接收到连接之后,将该链接交给线程池,这些 NIO 线程负责消息的读取,解码,编码和发送。

在绝大多数场景下,Reactor 多线程模型可以满足性能需求,但是仅仅使用一个 NIO 线程负责监听和处理所有客户端连接同时还要负责处理网络读写可能会存在性能问题,比如百万客户端并发连接,或则服务端需要多客户端的握手信息进行安全认证,认证本身非常消耗性能

代码:我们只需要修改一下 Handler 即可

public class Handler implements Runnable{
	// 把线程池给安排了,10 个线程
    // 真正项目开发,线程池一般是自定义的。
    private static final ExecutorService POOL = Executors.newFixedThreadPool(10);
    private final SocketChannel channel;
    public Handler(SocketChannel channel) {
        this.channel = channel;
    }
    @Override
    public void run() {
        try {
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            channel.read(buffer);
            buffer.flip();
            POOL.submit(() -> {
                try {
                    System.out.println("接收到客户端数据:"+new String(buffer.array(), 0, buffer.remaining()));
                    channel.write(ByteBuffer.wrap("已收到!".getBytes()));
                }catch (IOException e){
                    e.printStackTrace();
                }
            });
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

# 主从 Reactor 多线程模型

先看一下简化图

之前的模型既要处理客户端连接,又要处理网络读写,但是网络读写在高并发情况下会成为系统的一个瓶颈。所以将 Reactor 拆分为 mainReactorsubReactor 。即使用线程池进行网络读写,只是用一个线程专门接收客户端连接。

  • mainReactor 主要进行客户端连接的处理,处理完成之后将该链接交给 subReactor 处理客户端的网络读写。
  • subReactor 使用一个线程池来支撑,而其他业务操作也是用一个线程池。

主从Reactor模型

通过这种方式,服务器的性能将会大大提升,在可见情况下,其基本上可以支持百万连接。如果想看具体的函数调用,可以看这篇:深入理解主从 Reactor 多线程模型

这次改动比较多,我们首先设计一下从 Reactor

//SubReactor 作为从 Reactor
public class SubReactor implements Runnable, Closeable {
	// 每个从 Reactor 也有一个 Selector
    private final Selector selector;
	
  	// 创建一个 4 线程的线程池,也就是四个从 Reactor 工作
    private static final ExecutorService POOL = Executors.newFixedThreadPool(4);
    private static final SubReactor[] reactors = new SubReactor[4];
    private static int selectedIndex = 0;  // 采用轮询机制,每接受一个新的连接,就轮询分配给四个从 Reactor
    static {   // 在一开始的时候就让 4 个从 Reactor 跑起来
        for (int i = 0; i < 4; i++) {
            try {
                reactors[i] = new SubReactor();
                // 一开就将从 Reactor 放到线程池中运行
                POOL.submit(reactors[i]);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
	
    // 轮询获取下一个 Selector(Acceptor 用)
    public static Selector nextSelector(){
        Selector selector = reactors[selectedIndex].selector;
        selectedIndex = (selectedIndex + 1) % 4;
        return selector;
    }
    private SubReactor() throws IOException {
        selector = Selector.open();
    }
    @Override
    public void run() {
        try {   // 启动后直接等待 selector 监听到对应的事件即可,其他的操作逻辑和 Reactor 一致
            while (true) {
                int count = selector.select();
                System.out.println(Thread.currentThread().getName()+" >> 监听到 "+count+" 个事件");
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    this.dispatch(iterator.next());
                    iterator.remove();
                }
            }
        }catch (IOException e) {
            e.printStackTrace();
        }
    }
    private void dispatch(SelectionKey key){
        Object att = key.attachment();
        if(att instanceof Runnable) {
            ((Runnable) att).run();
        }
    }
    @Override
    public void close() throws IOException {
        selector.close();
    }
}

我们设计的 SubReactor 没有 ServerSocketChannel ,而是每个从 Reactor 都有一个 Selector ,在主 Reactor 中通过 nextSelector 静态方法轮流获得不同从 ReactorSelector

最后修改一下 Acceptor

public class Acceptor implements Runnable{
    private final ServerSocketChannel serverChannel;   // 只需要一个 ServerSocketChannel 就行了
    public Acceptor(ServerSocketChannel serverChannel) {
        this.serverChannel = serverChannel;
    }
    @Override
    public void run() {
        try{
            SocketChannel channel = serverChannel.accept();   // 还是正常进行 Accept 操作,得到 SocketChannel
            System.out.println(Thread.currentThread().getName()+" >> 客户端已连接,IP地址为:"+channel.getRemoteAddress());
            channel.configureBlocking(false);
            Selector selector = SubReactor.nextSelector();   // 选取下一个从 Reactor 的 Selector
            selector.wakeup();    // 在注册之前唤醒一下防止卡死
            channel.register(selector, SelectionKey.OP_READ, new Handler(channel));  // 注意现在注册的是从 Reactor 的 Selector
        }catch (IOException e){
            e.printStackTrace();
        }
    }
}

需要注意,不同的人设计在代码实现方面可能不同,重点是 Reactor 的思想,主 Reactor 处理连接(也就是调用 Acceptor )将 SocketChannel 注册到选择器上(因为之后就要进行读写操作),此时这个连接之后的读写请求都会发送给从 Reactor 的选择器,这也就规避了主 Reactor 处理读写请求。但是最终去完成这个请求的还是 Handler

# 参考

理解 Reactor 模型(这篇文章关于多线程 Reactor 的图有些不准确)

https://pdai.tech/md/java/io/java-io-nio-select-epoll.html

https://blog.csdn.net/Jack__iT/article/details/107010486

青空の霞光:https://www.yuque.com/qingkongxiaguang/javase/kl68ty