# 简介

BIO 是阻塞的,如果没有多线程, BIO 就需要一直占用 CPU ,而 NIO 则是非阻塞 IONIO 在获取连接或者请求时,即使没有取得连接和数据,也不会阻塞程序。NIO 的服务器实现模式为一个线程可以处理多个请求。

I/ONIO 最重要的区别是数据打包和传输的方式, I/O 以流的方式处理数据,而 NIO 以块的方式处理数据。

面向流的 I/O 一次处理一个字节数据:一个输入流产生一个字节数据,一个输出流消费一个字节数据。为流式数据创建过滤器非常容易,链接几个过滤器,以便每个过滤器只负责复杂处理机制的一部分。不利的一面是,面向流的 I/O 通常相当慢。

# 相关知识

# 通道

通道( Channel 接口)是对流的模拟,既可以用来进行读操作,又可以用来进行写操作。

通道与流的不同之处在于,流只能在一个方向上移动 (一个流必须是 InputStream 或者 OutputStream 的子类),而通道是双向的,可以用于读、写或者同时用于读写。

NIO 中常用的 Channel 有:

  • FileChannel: 从文件中读写数据;

  • DatagramChannel: 通过 UDP 读写网络中数据;

  • SocketChannel: 通过 TCP 读写网络中数据;

  • ServerSocketChannel: 可以监听新进来的 TCP 连接,对每一个新进来的连接都会创建一个 SocketChannel。

我们先看一下 Channel 内部代码

public interface Channel extends Closeable {
    // 通道是否处于开启状态
    public boolean isOpen();
    // 因为通道开启也需要关闭,所以实现了 Closeable 接口,所以这个方法懂的都懂
    public void close() throws IOException;
}

通道接口结构

最后整合为 ByteChannel 接口

public interface ByteChannel extends ReadableByteChannel, WritableByteChannel{
}

我们对比一下不使用通道和使用通道

// 不使用通道
public void fun1() {
    byte[] data = new byte[10];
    // 将 System.in 作为输入流
    InputStream in = System.in;
    while(true) {
        int len;
        while((len = in.read(data)) >= 0) {
            System.out.println("data = " + new String(data,0,1en));
        }
    }
}
// 使用通道
public void fun2() {
    ByteBuffer buffer = ByteBuffer.allocate(10);
    ReadableByteChannel readChannel = Channels.newChannel(Syetem.in);
    
    while(true) {
        // 将通道中的数据写到缓冲区,缓冲区最多一次装 10 个
        readChannel.read(buffer);
        
        buffer.flip();// 翻转缓冲区
        
        System.out.println(new String(buffer.array(),0,buffer.remaining()))
    }
}

# FileChannel

通道一个重要的特点就是能够双向传输数据。但是 FileInputStream 得到的通道只能输入, FileOutputStream 得到的通道只能输出。

FileInputStream in = new FileInputStream();
FileChannel channel = in.getChannel();

我们可以通过 RandomAccessFile 创建通道

public static void main(String[] args) throws IOException {
    /*
      通过 RandomAccessFile 进行创建,注意后面的 mode 有几种:
      r        以只读的方式使用
      rw   读操作和写操作都可以
      rws  每当进行写操作,同步的刷新到磁盘,刷新内容和元数据
      rwd  每当进行写操作,同步的刷新到磁盘,刷新内容
     */
    // 这里设定为支持读写,这样创建的通道才能具有这些功能
    try(RandomAccessFile f = new RandomAccessFile("test.txt", "rw");  
        FileChannel channel = f.getChannel()){   // 通过 RandomAccessFile 创建一个通道
        channel.write(ByteBuffer.wrap("伞兵二号马飞飞准备就绪!".getBytes()));
		
        // 注意读取也是从现在的位置开始
        System.out.println("写操作完成之后文件访问位置:"+channel.position());  
        channel.position(0);  // 需要将位置变回到最前面,这样下面才能从文件的最开始进行读取
        ByteBuffer buffer = ByteBuffer.allocate(128);
        channel.read(buffer);
        buffer.flip();
        System.out.println(new String(buffer.array(), 0, buffer.remaining()));
    }
}

除了基本的读写操作,也可以对文件进行截断: channel.truncate(20) 。就只会保留前 20 个字节

示例:文件拷贝

// 在缓冲区也有文件复制的案例,但是此处我们直接使用写好的方法
public void f() {
    try(FileInputStream in = new FileInputStream("test.txt");
       FileOutputStream out = new FileOutputStream("test.txt")) {
        
        FileChannel inChannel = in.getChannel();
        inChannel.transfer(0,inChannel.size(),out.getChannel());
        
        // 也可以反向操作
        out.getChannel().transfer(inChannel,0,inChannel.size());
    }
}

# 文件锁 FileLock

我们可以创建一个跨进程文件锁来防止多个进程之间的文件争抢操作(注意这里是进程,不是线程)FileLock 是文件锁,它能保证同一时间只有一个进程(程序)能够修改它,或者都只可以读,这样就解决了多进程间的同步文件,保证了安全性。但是需要注意的是,它进程级别的,不是线程级别的,他可以解决多个进程并发访问同一个文件的问题,但是它不适用于控制同一个进程中多个线程对一个文件的访问。

public void fun() {
    RandomAccessFile f = new RandomAccessFile("test.txt","rw");
    
    FileChannel Channel = f.getChannel();
    
    // 上锁:对 0~6 个字节上锁,false 表示独占锁,其他进程连读都不可以
    FileLock lock = channel.getLock(0,6,false);
    
    // 也可以:channel.getLock (0,channel.size (),false);
    
    // ...
    
    // 释放锁
    lock.release();
}

为了快速讲完这部分内容,笔者简化了一些细节

独占锁如果出现了交叉部分,也会被阻塞(人之常情嘛)。共享锁就是将参数设置为 true。还有一个 trylock 的方法,它会尝试去获得锁,如果失败就返回 null (这让我想起 AQS 里面的 tryAccquire 方法)。

# 缓冲区

发送给通道的所有数据都必须首先放到缓冲区中,从通道读取的任何数据都要先读到缓冲区。不会直接对通道进行读写数据,而是要先经过缓冲区。

缓冲区实质上是一个数组,但它不仅仅是一个数组。缓冲区提供了对数据的结构化访问,而且还可以跟踪系统的读 / 写进程。

缓冲区状态变量:

  • capacity : 最大容量;
  • position : 当前已经读写的字节数;
  • limit : 还可以读写的字节数。

常见的缓冲区有(都是 Buffer 抽象类的子类):

  • ByteBuffer
  • CharBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer

# 相关方法

# 获取对象

这里以 IntBuffer 为例,我们想要获得一个 IntBuffer 的实例化对象,需要调用 allocate 方法或者 wrap 方法

public static void main(String[] args) {
    // 1. 申请容量为 10 的 int 缓冲区
    IntBuffer buffer1 = IntBuffer.allocate(10);
    
    // 2. 将现有数组直接转换为缓冲区
    int[] arr = new int[]{1,2,3,4,5};
    IntBuffer buffer2 = IntBuffer,wrap(arr);
}
// 看一下内部两个方法的相关实现
public static IntBuffer allocate(int capacity) {
    if (capacity < 0)  
        throw new IllegalArgumentException();
    return new HeapIntBuffer(capacity, capacity);   
    // 这里也能说明 HeapIntBuffer 是 IntBuffer 子类
  	//HeapIntBuffer 是在堆内存中存放数据,本质上就数组,一会我们可以在深入看一下
}
public static IntBuffer wrap(int[] array) {
    return wrap(array, 0, array.length);   
}
public static IntBuffer wrap(int[] array, int offset, int length) {
    try {
      	// 创建了一个新的 HeapIntBuffer 对象,并且给了初始数组以及截取的起始位置和长度
        return new HeapIntBuffer(array, offset, length);
    } catch (IllegalArgumentException x) {
        throw new IndexOutOfBoundsException();
    }
}

这里涉及到一个新的类,我们先看一下总体的类结构

类结构

我们看一下两个类的构造方法

// IntBuffer 构造方法
final int[] hb;                  // 只有在堆缓冲区实现时才会使用
final int offset;
boolean isReadOnly;                 // 只有在堆缓冲区实现时才会使用
IntBuffer(int mark, int pos, int lim, int cap, int[] hb, int offset)
{
    super(mark, pos, lim, cap);  // 调用 Buffer 类的构造方法
    this.hb = hb;    //hb 就是真正我们要存放数据的数组,堆缓冲区底层其实就是这么一个数组
    this.offset = offset;   // 起始偏移位置
}
// HeapIntBuffer 构造方法
HeapIntBuffer(int[] buf, int off, int len) { 
    super(-1, off, off + len, buf.length, buf, 0);  
  	//mark 是标记,off 是当前起始下标位置,off+len 是最大下标位置,
    //buf.length 是底层维护的数组真正长度,buf 就是数组,最后一个 0 是起始偏移位置
}

# 写操作

Buffer 中所有的 put 函数。

public abstract IntBuffer put(int i);
// 在指定位置插入数据
public abstract IntBuffer put(int index, int i);
// 直接将 src 全部插入缓冲区,但是不能超过缓冲区大小
public final IntBuffer put(int[] src);
// 直接存放数组中的内容,同上,但是可以指定存放一段范围
public IntBuffer put(int[] src, int offset, int length);
// 直接存放另一个缓冲区中的内容
public IntBuffer put(IntBuffer src);

我们看做简单的 put 源码

public IntBuffer put(int x) {
    hb[ix(nextPutIndex())] = x;
    return this;
}
// 将 i 的值加上我们之前设定的 offset 偏移量值,但是默认是 0(非 0 的情况后面会介绍)
protected int ix(int i) {
    return i + offset;   
}
final int nextPutIndex() {
    int p = position;    // 获取 Buffer 类中的 position 位置(一开始也是 0)
    if (p >= limit)    // 位置肯定不能超过底层数组最大长度,否则越界
        throw new BufferOverflowException();
    position = p + 1;   // 获取之后会使得 Buffer 类中的 position+1
    return p;   // 返回当前的位置
}

如果将 offset 看为 0,那么整个函数就是在数组 hbposition 位置加入元素

示例:快速复制文件

public static void fastCopy(String src, String dist) throws IOException {
    /* 获得源文件的输入字节流 */
    FileInputStream fin = new FileInputStream(src);
    /* 获取输入字节流的文件通道 */
    FileChannel fcin = fin.getChannel();
    /* 获取目标文件的输出字节流 */
    FileOutputStream fout = new FileOutputStream(dist);
    /* 获取输出字节流的通道 */
    FileChannel fcout = fout.getChannel();
    /* 为缓冲区分配 1024 个字节 */
    ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
    while (true) {
        /* 从输入通道中读取数据到缓冲区中 */
        int r = fcin.read(buffer);
        /* read () 返回 -1 表示 EOF */
        if (r == -1) {
            break;
        }
        /* 切换读写 */
        buffer.flip();
        /* 把缓冲区的内容写入输出文件中 */
        fcout.write(buffer);
        
        /* 清空缓冲区 */
        buffer.clear();
    }
}

我们可以看一下整个 buffer 的状态变化,此处我们假设大小为 8 字节

①新建一个大小为 8 个字节的缓冲区,此时 position 为 0,而 limit = capacity = 8。capacity 变量不会改变,下面的讨论会忽略它。

buffer变化1

② 从输入通道中读取 5 个字节数据写入缓冲区中,此时 position 移动设置为 5,limit 保持不变。

buffer变化2

③ 在将缓冲区的数据写到输出通道之前,需要先调用 flip () 方法,这个方法将 limit 设置为当前 position,并将 position 设置为 0。

buffer变化3

④ 从缓冲区中取 4 个字节到输出缓冲中,此时 position 设为 4。

buffer变化4

⑤ 最后需要调用 clear () 方法来清空缓冲区,此时 position 和 limit 都被设置为最初位置。

buffer变化5

为什么缓冲区在写之后读需要使用 flip 函数呢?我们从图中看出 flip 之后会将 Position 置为 0。从读到写不需要翻转(如果是覆盖写的话)

public final Buffer flip() {
    // 修改 limit 值,当前写到哪里,下次读的最终位置就是这里
    limit = position;    
    position = 0;    //position 归零
    mark = -1;    // 标记还原为 - 1,但是现在我们还没用到
    return this;
}

我们通过 put 方法来深入理解一下 flip

public IntBuffer put(IntBuffer src) {
    if(src == this)
        throw new IllegalArgumentException();
    if(isReadOnly())
        throw new ReadOnlyBufferException();
    // 非常重要,在这里就是确定 src 的元素个数
    // 你现在是读 src,你应该将 src 转换为读状态
    int n = src.remaining();
    
    if(n > remaining())
		throw new BufferOverflowException();
    for(int i = 0;i < n;i++)
        put(src.get());
    return this;
}

remaining() 在读缓冲区时的结果期望是缓冲区数据个数,在写缓冲区时的结果期望是缓冲区的剩余容量。

为什么我要强调期望呢?因为如果不恰当使用 flip ,得到的结果就不是期望值。

public final int remaining() {  // 计算并获取当前缓冲区的剩余空间
    int rem = limit - position;   // 最大容量减去当前位置,就是剩余空间
    return rem > 0 ? rem : 0;  // 没容量就返回 0
}

如果是写缓冲区, limit-position 就是剩余容量;使用 flip 之后, limit-position 就是缓冲区包含的数据个数。所以, flip 的本质就是通过改变 limitposition 的值来改变二者之差的含义。・

# 读操作

为什么我们需要翻转操作,因为我们读写数据都是依托于 position 定位的。写入 / 读取( get() 方法)都会使 position 加一。如果超出范围抛出异常。

当然也可以用 get(i) 来指定读取第 i 个元素。

看一下有哪些重载的 get 方法

public abstract int get();
public abstract int get(int index);// 只有这个方法不会改变 position
public IntBuffer int get(int[] dst);// 实际是调用下面那个方法
public IntBuffer get(int[] dst,int offset,int length);

我们也可以使用 array() 方法直接返回 hb 数组。但是这直接将 hb 底层数组暴露在外面,所有修改都会生效在缓冲区。

public final int[] array() {
    if (hb == null)   // 为空那说明底层不是数组实现的,肯定就没法转换了
        throw new UnsupportedOperationException();
    if (isReadOnly)   // 只读也是不让直接取出的,因为一旦取出去岂不是就能被修改了
        throw new ReadOnlyBufferException();
    return hb;   // 直接返回 hb
}

最后,缓冲区还有一个 mark 操作,类似原理在缓冲流已经讲过了,此处不加赘述。当然缓冲区还有其他一些操作,这里笔者觉得不宜写太多,建议就是遇到了不懂就去浏览器搜。

只读缓冲区: HeapIntBufferRHeapIntBuffer 的子类,初始化为

IntBuffer buffer = IntBuffer.wrap(new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 0});
IntBuffer readBuffer = buffer.asReadOnlyBuffer();
public IntBuffer asReadOnlyBuffer() {
    // 注意这里并不是直接创建了 HeapIntBuffer,而是 HeapIntBufferR,并且直接复制的 hb 数组
    return new HeapIntBufferR(hb,    
                                 this.markValue(),
                                 this.position(),
                                 this.limit(),
                                 this.capacity(),
                                 offset);
}

# 选择器

这个其实就是 Unix IO 模型讲到的多路复用, NIO 实现了 IO 多路复用中的 Reactor 模型,一个线程使用一个选择器 Selector 通过轮询的方式去监听多个通道 Channel 上的事件,从而让一个线程就可以处理多个事件。

通过配置监听的通道 Channel 为非阻塞,那么当 Channel 上的 IO 事件未到达时,就不会进入阻塞状态一直等待,而是继续轮询其它 Channel,找到 IO 事件已经到达的 Channel 执行。

只有套接字 Channel 才能配置非阻塞,而 FileChannel 不能(FileChannel 配置非阻塞也没有意义)。

选择器

# 创建选择器

Selector selector = Selector.open();

# 通道注册到选择器

ServerSocketChannel ssChannel = ServerSocketChannel.open();
ssChannel.configureBlocking(false);
ssChannel.register(selector,SelectionKey.OP_ACCEPT);

通道配置必须为非阻塞模式,最后一行代码需要指定注册的具体事件

// 在 SelectionKey 中定义
public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;

# 监听事件

int num = selector.select();

使用 select() 来监听到达的事件,它会一直阻塞直到有至少一个事件到达。

# 获取到达的事件

Set<SelectionKey> leys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = keys.iterator();
while(keyIterator.hasNext()) {
    SelectionKey key = keyIterator.next();
    if(key.isAcceptable()) {
        //...
    } else if(key.isReadable()) {
        //...
    }
    keyIterator.remove();
}

# 事件循环

一次 select() 调用不能处理所有事件,并且服务器端有可能需要一直监听事件,因此服务器端处理事件的代码一般会放在一个死循环内。

while (true) {
    int num = selector.select();
    Set<SelectionKey> keys = selector.selectedKeys();
    Iterator<SelectionKey> keyIterator = keys.iterator();
    while (keyIterator.hasNext()) {
        SelectionKey key = keyIterator.next();
        if (key.isAcceptable()) {
            // ...
        } else if (key.isReadable()) {
            // ...
        }
        keyIterator.remove();
    }
}

# 套接字 NIO 实例

public class NIOServer {
    public static void main(String[] args) throws IOException {
        // 创建选择器
        Selector selector = Selector.open();
        // 将通道注册到选择器上(设置非阻塞)
        ServerSocketChannel ssChannel = ServerSocketChannel.open();
        ssChannel.configureBlocking(false);
        ssChannel.register(selector,SelectionKey.OP_ACCEPT);
        ServerSocket serverSocket = ssChannel.socket();
        InetSocketAddress address = new InetSocketAddress("127.0.0.1",8888);
        serverSocket.bind(address);
        while(true) {
            selector.select();
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> keyIterator = keys.iterator();
            while(keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();
                if(key.isAcceptable()) {
                    ServerSocketChannel ssChannel1 = (ServerSocketChannel) key.channel();
                    // 雾浮起为每个新连接创建一个 SocketChannel
                    SocketChannel sChannel = ssChannel1.accept();
                    sChannel.configureBlocking(false);
                    // 该连接主要用于从客户端读取数据
                    sChannel.register(selector,SelectionKey.OP_READ);
                } else if(key.isReadable()) {
                    SocketChannel sChannel = (SocketChannel) key.channel();
                    System.out.println(readDataFromSocketChannel(sChannel));
                    sChannel.close();
                }
                keyIterator.remove();
            }
        }
    }
    private static String readDataFromSocketChannel(SocketChannel sChannel) throws IOException {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        StringBuilder data = new StringBuilder();
        while(true) {
            buffer.clear();
            int n = sChannel.read(buffer);
            if(n == -1) break;
            buffer.flip();
            int limit = buffer.limit();
            char[] dst = new char[limit];
            for(int i = 0;i < limit;i++) {
                dst[i] = (char)buffer.get(i);
            }
            data.append(dst);
            buffer.clear();
        }
        return data.toString();
    }
}

客户端

public class NIOClient {
    public static void main(String[] args) throws IOException {
        Socket socket = new Socket("127.0.0.1",8888);
        OutputStream out = socket.getOutputStream();
        String s = "hello world";
        out.write(s.getBytes());
        out.close();
    }
}

# 参考文章

https://pdai.tech/md/java/io/java-io-nio.html

语雀(青空の霞光):https://www.yuque.com/qingkongxiaguang/javase/kl68ty#e4f52ebc

Java NIO Tutorial (opens new window)

Java NIO 浅析 (opens new window)

IBM: NIO 入门 (opens new window)

Eckel B, 埃克尔,昊鹏,等. Java 编程思想 [M]. 机械工业出版社,2002.

IBM: NIO 入门 (opens new window)

IBM: 深入分析 Java I/O 的工作机制 (opens new window)

IBM: 深入分析 Java 中的中文编码问题 (opens new window)

IBM: Java 序列化的高级认识 (opens new window)

NIO 与传统 IO 的区别 (opens new window)

Decorator Design Pattern (opens new window)

Socket Multicast

https://juejin.cn/post/6974191624331100174