NIO

3/22/2022 NIO

# NIO

# Buffer-缓冲区

在 Java NIO 中的通道(Channel)就相当于操作系统的内核空间(kernel space)的缓冲区,而缓冲区(Buffer)对应的相当于操作系统的用户空间(user space)中的用户缓冲区(user buffer)。

img

  1. 尽管通道用于读写数据,但是我们并不直接操作通道进行读写,而是通过缓冲区完成。缓冲区实际上是一个容器对象。发送给通道的所有对象都必须先放到缓冲区中,同时从通道中读取的任何数据都需要先放到缓冲区
  2. 缓冲区体现了NIO和IO的一个重要区别。在BIO中,读写可以直接操作流对象。简单地说,缓冲区是一个字节数组,也可以使用其他类型的数组。但是缓冲区又不仅仅是一个数组,它提供了对数据的结构化访问,而且还可以跟踪系统的读写进程。

# 非直接缓冲区

img

HeapByteBufer所创建的字节缓冲区就是在JVM堆中的,即JVM内部所维护的字节数组。

public static ByteBuffer allocate(int capacity) {
    if (capacity < 0)
        throw new IllegalArgumentException();
    return new HeapByteBuffer(capacity, capacity);
}

# 直接缓冲区

img

  1. DirectByteBuffer是直接操作操作系统本地代码创建的内存缓冲数组
  2. DirectByteBuffer的使用场景:
    • java程序与本地磁盘、socket传输数据
    • 大文件对象,可以使用。不会受到堆内存大小的限制。
    • 不需要频繁创建,生命周期较长的情况,能重复使用的情况。
public static ByteBuffer allocateDirect(int capacity) {
    return new DirectByteBuffer(capacity);
}

# 使用直接内存的原因

  • 对垃圾回收停顿的改善。因为full gc时,垃圾收集器会对所有分配的堆内内存进行扫描,垃圾收集对Java应用造成的影响,跟堆的大小是成正比的。过大的堆会影响Java应用的性能。如果使用堆外内存的话,堆外内存是直接受操作系统管理。这样做的结果就是能保持一个较小的JVM堆内存,以减少垃圾收集对应用的影响。特别是对于IO密集型的应用,会产生大量的IO,导致Java堆空间频繁GC(full gc时会触发堆外空闲内存的回收)

  • 减少了数据从JVM拷贝到native堆的次数,在某些场景下可以提升程序I/O的性能。

  • 可以突破JVM内存限制,操作更多的物理内存。

  • 当直接内存不足时会触发full gc,排查full gc的时候,一定要考虑。

# 使用直接内存的问题

  • 堆外内存难以控制,如果内存泄漏,那么很难排查(VisualVM可以通过安装插件来监控堆外内存)。

  • 堆外内存只能通过序列化和反序列化来存储,保存对象速度比堆内存慢,不适合存储很复杂的对象。一般简单的对象或者扁平化的比较适合。

  • 直接内存的访问速度(读写方面)会快于堆内存。在申请内存空间时,堆内存速度高于直接内存。

  • 直接内存适合申请次数少,访问频繁的场合。如果内存空间需要频繁申请,则不适合直接内存。

# 直接内存映射

  1. Linux提供的mmap系统调用, 它可以将一段用户空间内存映射到内核空间, 当映射成功后, 用户对这段内存区域的修改可以直接反映到内核空间;同样地, 内核空间对这段区域的修改也直接反映用户空间。正因为有这样的映射关系, 就不需要在用户态(User-space)与内核态(Kernel-space) 之间拷贝数据, 提高了数据传输的效率,这就是内存直接映射技术

  2. NIO中一个重要的类:MappedByteBuffer——java nio引入的文件内存映射方案,读写性能极高。MappedByteBuffer将文件直接映射到内存。可以映射整个文件,如果文件比较大的话可以考虑分段进行映射,只要指定文件的感兴趣部分就可以。

  3. 由于MappedByteBuffer申请的是直接内存,因此不受Minor GC控制,只能在发生Full GC时才能被回收,因此Java提供了DirectByteBuffer类来改善这一情况。它是MappedByteBuffer类的子类,同时它实现了DirectBuffer接口,维护一个Cleaner对象来完成内存回收。因此它既可以通过Full GC来回收内存,也可以调用clean()方法来进行回收。

# Channel-通道

在 Java NIO 中的通道(Channel)就相当于操作系统的内核空间(kernel space)的缓冲区,而缓冲区(Buffer)对应的相当于操作系统的用户空间(user space)中的用户缓冲区(user buffer)。

img

  1. 通道是对BIO中流的模拟,到任何目的地(或来自任何地方)的所有数据都必须通过一个通道对象。

  2. 通道与流的不同之处在于通道是双向的。流只是在一个方向上移动(要么用于读,要么用于写),而通道可以用于读、写或者同时用于读写。因为通道是双向的,所以它可以比流更好地反映底层操作系统的真实情况。

  3. 通道(Channel)是对 BIO 中的流的模拟,可以通过它读写数据。

  4. Channel,类似在 Linux 之类操作系统上看到的文件描述符,是 NIO 中被用来支持批量式 IO 操作的一种抽象。File 或者 Socket,通常被认为是比较高层次的抽象,而 Channel 则是更加操作系统底层的一种抽象,这也使得 NIO 得以充分利用现代操作系统底层机制,获得特定场景的性能优化,例如,DMA(Direct Memory Access)等。不同层次的抽象是相互关联的,我们可以通过 Socket 获取 Channel,反之亦然。

  5. 通道与流的不同之处在于:

    • 流是单向的 - 一个流只能单纯的负责读或写。
    • 通道是双向的 - 一个通道可以同时用于读写。
  6. 通道包括以下类型:

    • FileChannel:从文件中读写数据;
    • DatagramChannel:通过 UDP 读写网络中数据;
    • SocketChannel:通过 TCP 读写网络中数据;
    • ServerSocketChannel:可以监听新进来的 TCP 连接,对每一个新进来的连接都会创建一个 SocketChannel。

# 通道间数据传输-零拷贝思想

  1. transferTo()方法:将源通道的数据传输到目的通道。

  2. NIO的零拷贝由transferTo()方法实现。transferTo()方法将数据从FileChannel对象传送到可写的字节通道(如Socket Channel等)。在内部实现中,由native方法transferTo0()来实现,它依赖底层操作系统的支持。在UNIX和Linux系统中,调用这个方法将会引起sendfile()系统调用

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class test1 {
    public static void main(String[] args) throws Exception {
        // 获取输入通道
        File file = new File("1.txt");
        FileInputStream fileInputStream = new FileInputStream(file);
        FileChannel fileInputStreamChannelchannel = fileInputStream.getChannel();

        // 获取输出通道
        FileOutputStream fileOutputStream = new FileOutputStream(new File("2.txt"));
        FileChannel fileOutputStreamChannel = fileOutputStream.getChannel();

        // 获取缓冲
        ByteBuffer allocate = ByteBuffer.allocate((int) file.length());

        // 将输入流通道的数据读取到输出流的通道
        fileInputStreamChannelchannel.transferTo(0, allocate.limit(), fileOutputStreamChannel);
        // 关闭流
        fileInputStreamChannelchannel.close();
        fileOutputStreamChannel.close();
        fileInputStream.close();
        fileOutputStream.close();

    }
}

# 文件NIO实例

public static void fastCopy(String src, String dist) throws IOException {

    /* 获得源文件的输入字节流 */
    FileInputStream fin = new FileInputStream(src);

    /* 获取输入字节流的文件通道 */
    FileChannel fcin = fin.getChannel();

    /* 获取目标文件的输出字节流 */
    FileOutputStream fout = new FileOutputStream(dist);

    /* 获取输出字节流的通道 */
    FileChannel fcout = fout.getChannel();

    /* 为缓冲区分配 1024 个字节 */
    ByteBuffer buffer = ByteBuffer.allocateDirect(1024);

    while (true) {

        /* 从输入通道中读取数据到缓冲区中 */
        int r = fcin.read(buffer);

        /* read() 返回 -1 表示 EOF */
        if (r == -1) {
            break;
        }

        /* 切换读写 */
        buffer.flip();

        /* 把缓冲区的内容写入输出文件中 */
        fcout.write(buffer);
        
        /* 清空缓冲区 */
        buffer.clear();
    }
}

# Selector-多路复用器

  1. 选择器组件用于同时检测多个通道的事件以实现异步IO。可以把感兴趣的事件注册到Selector上,当事件发生时可以通过Selector获得事件发生的通道,并进行相关操作。

  2. 异步IO的一个优势在于,它允许同时根据大量的输入输出执行IO操作。同步IO一般要借助于轮询,或者创建许许多多的线程以处理大量的链接。使用异步IO你可以监听任意数量的通道事件,不必轮询,也不必启动额外的线程。

  3. 其实NIO的主要用途是网络IO,再NIO之前Java要使用网络编程就只有Socket。而Socket是阻塞的,显然对于高并发的场景是不适用的。所以NIO的出现就是解决这个痛点。主要思想是把Channel通道注册到Selector中,通过Selecotr去监听Channel中的事件状态,这样就不需要阻塞等待客户端的连接,从主动等待客户端的连接,变成 了通过事件驱动。没有监听的事件,服务器可以做自己的事情。

  4. NIO 实现了 IO 多路复用中的 Reactor 模型,一个线程 Thread 使用一个选择器 Selector 通过轮询的方式去监听多个通道 Channel 上的事件,从而让一个线程就可以处理多个事件。通过配置监听的通道 Channel 为非阻塞,那么当 Channel 上的 IO 事件还未到达时,就不会进入阻塞状态一直等待,而是继续轮询其它 Channel,找到 IO 事件已经到达的 Channel 执行

  5. 因为创建和切换线程的开销很大,因此使用一个线程来处理多个事件而不是一个线程处理一个事件具有更好的性能。

  6. 应该注意的是,只有套接字 Channel 才能配置为非阻塞,而 FileChannel 不能,为 FileChannel 配置非阻塞也没有意义

img

# NIO的多人聊天室

服务端代码

package Nio.Group;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

public class GroupChatServer {
    private Selector selector;

    private ServerSocketChannel serverSocketChannel;

    public static final int Port = 9999;
    public GroupChatServer() throws Exception {
        this.selector = Selector.open();
        this.serverSocketChannel = ServerSocketChannel.open();
        // 绑定端口
        this.serverSocketChannel.bind(new InetSocketAddress("localhost", Port));
        // 设置为非阻塞
        serverSocketChannel.configureBlocking(false);
        // 把通道注册到选择器中
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    }

    public void listen() throws Exception {
        while (true) {
            // 获取事件的总数
            int count = selector.select(2000);
            if (count > 0) {
                // 获取事件的集合
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                // 获取迭代器,感觉这个和分散读很像,呸,感觉就是分散读
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    // 一个单独的事件
                    SelectionKey key = iterator.next();
                    // 是否可以获取连接
                    if (key.isAcceptable()) {
                        SocketChannel socketChannel = serverSocketChannel.accept();
                        // 设置为非阻塞
                        socketChannel.configureBlocking(false);
                        // 注册到选择器之中
                        socketChannel.register(selector, SelectionKey.OP_READ);
                        System.out.println("连接成功");
                        System.out.println(socketChannel.getRemoteAddress() + "上线了~");
                    }

                    if (key.isReadable()) {
                        readData(key);
                    }
                    iterator.remove();
                }
            } else {
                System.out.println("等着上线........");
            }
        }
    }

    private void readData(SelectionKey selectionKey) {
        SocketChannel socketChannel = null;
        try {
            // 从selectionKey中获取channel
            socketChannel = (SocketChannel) selectionKey.channel();
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            int count = socketChannel.read(byteBuffer);
            if (count > 0) {
                String msg = new String(byteBuffer.array());
                System.out.println(socketChannel.getRemoteAddress() + "from 客户端: " + msg);
                notifyAllClient(msg, socketChannel);
            }
        } catch (IOException e) {
            try {
                System.out.println(socketChannel.getRemoteAddress() + "离线了......");
                // 取消注册
                selectionKey.cancel();
                // 关闭流
                socketChannel.close();
            } catch (IOException ioException) {
                ioException.printStackTrace();
            }
            e.printStackTrace();
        }
    }

    private void notifyAllClient(String msg, SocketChannel noNotifyChannel) throws IOException {
        System.out.println("服务器转发信息~");
        // 这里不能用selector.selectedKeys(), 这里是要给无论在线还是离线的人都发送消息。
        // 用这个就是只是给无论离线还是在线的人都发送
        for (SelectionKey selectionKey : selector.keys()) {
            Channel channel = selectionKey.channel();
            if (channel instanceof SocketChannel && channel != noNotifyChannel) {
                // 转成SocketChannel类型
                SocketChannel socketChannel = (SocketChannel) channel;
                // 创建一个缓冲区
                ByteBuffer wrap = ByteBuffer.wrap(msg.getBytes());
                // 将这个缓冲区的内容发送的所有的客户机
                socketChannel.write(wrap);
            }
        }

    }

    public static void main(String[] args) throws Exception {
        GroupChatServer groupChatServer = new GroupChatServer();
        // 启动服务器进行监听
        groupChatServer.listen();
    }
}

客户端代码:

package Nio.Group;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Scanner;

public class GroupChatClient {
    private Selector selector;

    private SocketChannel socketChannel;

    private String userName;

    public GroupChatClient() throws IOException {
        this.selector = Selector.open();
        socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9999));
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_READ);
        userName = socketChannel.getLocalAddress().toString().substring(1);
        System.out.println(userName + "is ok~");
    }
    // 发送消息
    private void sendMsg(String msg) throws IOException {
        msg = userName + "说:" + msg;
        socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
    }

    private void readMsg() throws IOException {

        int count = selector.select();
        if (count > 0) {
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                if (selectionKey.isReadable()) {
                    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    socketChannel.read(byteBuffer);
                    System.out.println(new String(byteBuffer.array()));
                }
                iterator.remove();
            }
        }
    }

    public static void main(String[] args) throws IOException {
        GroupChatClient chatClient = new GroupChatClient();
        new Thread(() -> {
            while (true) {
                try {
                    chatClient.readMsg();
                    Thread.sleep(3000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNextLine()) {
            String msg = scanner.nextLine();
            chatClient.sendMsg(msg);
        }
    }

}

# 套接字NIO实例

public class NIOServer {

    public static void main(String[] args) throws IOException {

        Selector selector = Selector.open();

        ServerSocketChannel ssChannel = ServerSocketChannel.open();
        ssChannel.configureBlocking(false);
        ssChannel.register(selector, SelectionKey.OP_ACCEPT);

        ServerSocket serverSocket = ssChannel.socket();
        InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8888);
        serverSocket.bind(address);

        while (true) {

            selector.select();
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> keyIterator = keys.iterator();

            while (keyIterator.hasNext()) {

                SelectionKey key = keyIterator.next();

                if (key.isAcceptable()) {

                    ServerSocketChannel ssChannel1 = (ServerSocketChannel) key.channel();

                    // 服务器会为每个新连接创建一个 SocketChannel
                    SocketChannel sChannel = ssChannel1.accept();
                    sChannel.configureBlocking(false);

                    // 这个新连接主要用于从客户端读取数据
                    sChannel.register(selector, SelectionKey.OP_READ);

                } else if (key.isReadable()) {

                    SocketChannel sChannel = (SocketChannel) key.channel();
                    System.out.println(readDataFromSocketChannel(sChannel));
                    sChannel.close();
                }

                keyIterator.remove();
            }
        }
    }

    private static String readDataFromSocketChannel(SocketChannel sChannel) throws IOException {

        ByteBuffer buffer = ByteBuffer.allocate(1024);
        StringBuilder data = new StringBuilder();

        while (true) {

            buffer.clear();
            int n = sChannel.read(buffer);
            if (n == -1) {
                break;
            }
            buffer.flip();
            int limit = buffer.limit();
            char[] dst = new char[limit];
            for (int i = 0; i < limit; i++) {
                dst[i] = (char) buffer.get(i);
            }
            data.append(dst);
            buffer.clear();
        }
        return data.toString();
    }
}
public class NIOClient {

    public static void main(String[] args) throws IOException {
        Socket socket = new Socket("127.0.0.1", 8888);
        OutputStream out = socket.getOutputStream();
        String s = "hello world";
        out.write(s.getBytes());
        out.close();
    }
}

# NIO和IO的主要区别

  1. Stream Oriented vs. Buffer Oriented

    • Java NIO和IO之间的第一个重要区别是IO是面向流的,其中NIO是面向缓冲区的。

    • 面向流的Java IO意味着您可以从流中一次读取一个或多个字节。你对读取的字节做什么取决于你。它们不会缓存在任何地方。此外,您无法在流中的数据中前后移动。如果需要在从流中读取的数据中前后移动,则需要先将其缓存在缓冲区中。

    • Java NIO的面向缓冲区的方法略有不同。数据被读入缓冲区,稍后处理该缓冲区。你可以根据需要在缓冲区中前后移动。这使你在处理过程中具有更大的灵活性。但是,你还需要检查缓冲区是否包含完整处理所需的所有数据。并且,你需要确保在将更多数据读入缓冲区时,不要覆盖尚未处理的缓冲区中的数据。

  2. Blocking vs. Non-blocking IO

    • Java IO的各种流都是blocking的。这意味着,当线程调用read()或write()时,该线程将被阻塞,直到有一些数据要读取,或者数据被完全写入,在此期间,该线程无法执行任何其他操作。

    • Java NIO的非阻塞模式允许线程请求从通道读取数据,并且只获取当前可用的内容,或者根本没有数据,如果当前没有数据可用。线程可以继续使用其他内容,而不是在数据可供读取之前保持阻塞状态。非阻塞写入也是如此,线程可以请求将某些数据写入通道,但不要等待它完全写入。然后线程可以继续并在同一时间做其他事情。

    • 线程在IO调用中没有阻塞时花费空闲时间,通常在此期间在其他通道上执行IO。也就是说,单个线程现在可以管理多个输入和输出通道。

# 参考

Java NIO - 零拷贝实现 | Java 全栈知识体系 (opens new window)

Last Updated: 3/28/2022, 9:29:49 PM