java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java 网络IO模型

通过Java带你了解网络IO模型

作者:nssnail

这篇文章将通过Java带大家了解网络IO模型,包括BIO,NoBlockingIO,NIO(NewIO),AIO等做了详细得介绍,感兴趣的小伙伴可以参考阅读本文

1.BIO

1.1 简述

BIO是同步阻塞IO,所有连接都是同步执行的,在上一个连接未处理完的时候是无法接收下一个连接

1.2 代码示例

在上述代码中,如果启动一个客户端起连接服务端时如果没有发送数据,那么下一个连接将永远无法进来

public static void main(String[] args) {
    try {
        // 监听端口
        ServerSocket serverSocket = new ServerSocket(8080);
        // 等待客户端的连接过来,如果没有连接过来,就会阻塞
        while (true) {
            // 阻塞IO中一个线程只能处理一个连接
            Socket socket = serverSocket.accept();
            System.out.println("客户端建立连接:"+socket.getPort());
            String line = null;
            try {
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                line = bufferedReader.readLine();
                System.out.println("客户端的数据:" + line);
                BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
                bufferedWriter.write("ok\n");
                bufferedWriter.flush();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}

1.3优点和缺点

优点:

缺点:

1.4 思考

问:既然每个连接进来都会阻塞,那么是否可以使用多线程的方式接收处理?

答:当然可以,但是这样如果有1w个连接那么就要启动1w个线程去处理吗,线程是非常宝贵的资源,频繁使用线程对系统的开销是非常大的

2. NoBlockingIO

2.1 简述

NoBlockingIO是同步非阻塞IO,相对比阻塞IO,他在接收数据的时候是非阻塞的,会一直轮询去问内核是否准备好数据,直到有数据返回

ps: NoBlockingIO并不是真正意义上的NIO

2.2 代码示例

在下述代码中,将BIO中的ServerSocket修改为ServerSocketChannel,然后configureBlocking为false则为非阻塞,从而数据都是在channel的buffer(缓冲区)中获取,不理解没关系,就当作是设置非阻塞IO的方式就好

此时在accept中是非阻塞的,不断的等待客户端进来

注意

public static List<SocketChannel> channelList = new ArrayList<>();
public static void main(String[] args) {
    try {
        // 相当于serverSocket
        // 1.支持非阻塞  2.数据总是写入buffer,读取也是从buffer中去读  3.可以同时读写
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 设置非阻塞
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.socket().bind(new InetSocketAddress(8080));
        while (true){
            // 这里将不再阻塞
            SocketChannel socketChannel = serverSocketChannel.accept();
            if(socketChannel != null){
                socketChannel.configureBlocking(false);
                channelList.add(socketChannel);
            }else {
                System.out.println("没有请求过来!!!");
            }
            for (SocketChannel client : channelList){
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                // 也不阻塞
                int num = client.read(byteBuffer);
                if(num>0){
                    System.out.println("客户端端口:"+ client.socket().getPort()+",客户端收据:"+new String(byteBuffer.array()));
                }else {
                    System.out.println("等待客户端写数据");
                }
            }
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}

2.3 优点和缺点

优点:

缺点:

2.4 思考

问 :既然一直轮询会产生很多的无效轮询,并浪费系统资源,那么有没有更好的办法呢

答: 通过事件注册的方式(多路复用器)

3. NIO(NewIO)

3.1 简述

NewIO才是真正意义上的NIO,NoBlockingIO只能算是NIO的前身,因为NewIO在NoBlockingIO上加上了多路复用器,使得NIO更加完美

在下图中,channel不再是直接循环调用内核,而是将连接,接收,读取,写入等事件注册到多路复用器中,如果没有事件到来将会阻塞等待

NIO三件套(记):

3.2 代码示例

从代码示例可以看到,在没有连接的时候会在selector.select()中阻塞,然后等待客户端连接或者写入数据,不同的监听事件会有不同的处理方法

具体流程:

服务端代码

public class NewIOServer {
    static Selector selector;
    public static void main(String[] args) {
        try {
            selector = Selector.open();
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.socket().bind(new InetSocketAddress(8080));
            // 需要把serverSocketChannel注册到多路复用器上
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            while (true) {
                // 阻塞
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    if (key.isAcceptable()) {
                        handlerAccept(key);
                    } else if (key.isReadable()) {
                        handlerRead(key);
                    }else if(key.isWritable()){
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    private static void handlerRead(SelectionKey key) {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        try {
            socketChannel.read(allocate);
            System.out.println("server msg:" + new String(allocate.array()));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    private static void handlerAccept(SelectionKey key) {
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
        // 不阻塞
        try {
            SocketChannel socketChannel = serverSocketChannel.accept();
            socketChannel.configureBlocking(false);
            socketChannel.write(ByteBuffer.wrap("It‘s server msg".getBytes()));
            // 读取客户端的数据
            socketChannel.register(selector, SelectionKey.OP_READ);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

客户端代码

public class NewIOClient {
    static Selector selector;
    public static void main(String[] args) {
        try {
            selector = Selector.open();
            SocketChannel socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
            socketChannel.connect(new InetSocketAddress("localhost", 8080));
            // 需要把socketChannel注册到多路复用器上
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
            while (true) {
                // 阻塞
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    if (key.isConnectable()) {
                        handlerConnect(key);
                    } else if (key.isReadable()) {
                        handlerRead(key);
                    } else if (key.isWritable()) {
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    private static void handlerRead(SelectionKey key) {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        try {
            socketChannel.read(allocate);
            System.out.println("client msg:" + new String(allocate.array()));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    private static void handlerConnect(SelectionKey key) throws IOException {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        if (socketChannel.isConnectionPending()) {
            socketChannel.finishConnect();
        }
        socketChannel.configureBlocking(false);
        socketChannel.write(ByteBuffer.wrap("it‘s client msg".getBytes()));
        socketChannel.register(selector,SelectionKey.OP_READ);
    }
}

3.3 优点和缺点

优点:

缺点:

3.4 思考

问:select方法不是也阻塞吗,那跟BIO有什么区别?

答:虽然他是在select阻塞,但是他通过事件注册的方式,可以将多个selectKey同时加载到selectionKeys集合中,通过for循环处理不同的事件,而BIO只能由一个连接处理完才能处理下一个连接

问:什么是多路复用?

答:

多路:是指多个连接的管道,通道

复用:复用一个系统调用,原本多次系统调用变成一次

4. 扩展select/poll、epoll

4.1 简述

由第三部分的NIO可知,多路复用把for循环的系统调用变成了一次调用,那么他具体是怎么实现的?

其实我们仔细思考一下就能知道,他主要实现就是在selector.select(),由他去阻塞和触发动作。然而在实现这些功能的时候,就用到了三种模型,select、poll、epoll。因为select和poll很相似,所以大家都会把他们归为一类。

4.2 select/poll

我们先来说说什么是select?

实现过程

具体如下图所示:

产生问题

什么是poll?

因为fd是个数组,所以容量会达到上限,而poll则将这个数据结构改成了链表,所以解决了select模型中上限的问题,但是遍历socket的问题还是存在

select和poll的本质区别就是一个是用数组存放socket,一个是用链表存放,其他地方没有任何区别

4.3 epoll

epoll和select/poll相比,采用了事件回调的机制,并且使用红黑树去维护注册的socket,如下图所示

实现过程

这样通过红黑树来维护连接和通过就绪列表来处理数据就可以保证可以存放最大限度的socket数量,并且在唤醒线程处理去处理就绪列表的时候肯定都是需要处理并且已就绪的socket。完美的解决了select/poll中的问题

总结

epoll相较于select/poll的优势:

4.4 扩展话题

对于epoll的一些扩展,有兴趣的可以了解下,不感兴趣可以略过

4.4.1 什么是ET和LT?

ET和LT是epoll工作模式中的两种触发方式,分别表示边缘触发(Edge Triggered)和水平触发(Level Triggered)。

在ET模式下,当一个文件描述符上出现事件时,epoll_wait函数只会通知一次,即只有在文件描述符状态发生变化时才会返回。如果应用程序没有处理完这个事件,那么下一次调用epoll_wait函数时,它不会再返回这个事件,直到下一次状态变化。

ET模式下的事件处理更为高效,因为它只会在必要的时候通知应用程序,避免了重复通知的问题。但是,由于ET模式只在状态变化时通知一次,因此应用程序需要及时处理事件,否则可能会错过某些事件。

在LT模式下,当一个文件描述符上出现事件时,epoll_wait函数会重复通知应用程序,直到该文件描述符上的事件被处理完毕为止。如果应用程序没有处理完这个事件,那么下一次调用epoll_wait函数时,它会再次返回这个事件,直到应用程序处理完为止。

LT模式下的事件处理比较简单,因为它会重复通知应用程序,直到应用程序处理完为止。但是,由于重复通知的问题,LT模式下可能会导致一些性能问题。同时,在LT模式下,应用程序需要及时处理事件,否则可能会导致文件描述符上的事件积压,影响系统的性能。

4.4.2 什么是惊群?

epoll的惊群(Thundering Herd)指的是多个线程或进程同时等待同一个epoll文件描述符上的事件,

当文件描述符上出现事件时,内核会通知所有等待的线程或进程,但只有一个线程或进程能够真正处理该事件,其他线程或进程会被唤醒但不能处理该事件,从而造成资源浪费和性能降低的问题。

惊群问题是由于内核通知等待线程或进程的方式引起的。在epoll中,当文件描述符上出现事件时,内核会通知所有等待的线程或进程,而不是通知一个线程或进程。因此,如果有多个线程或进程等待同一个文件描述符,那么当该文件描述符上出现事件时,内核会通知所有等待的线程或进程,导致惊群问题。

为了解决惊群问题,可以采用以下两种方式:

5. AIO

5.1简述

在上面将的BIO,NIO中都是同步IO,BIO叫做同步阻塞,NIO叫做同步非阻塞,那么AIO则是异步IO,全名(Asynchronous I/O)

5.2 代码示例

从代码示例可以看到,以下代码都是基于回调机制实现的,并不会像BIO和NIO一样使用轮询的方式,他不需要像同步IO一样需要查找就绪socket,只要客户端有数据写入就会回调给服务端,既然是异步的所以就不会存在阻塞

服务端代码

public class AIOServer {
    public static void main(String[] args) throws Exception {
        // 创建一个SocketChannel并绑定了8080端口
        final AsynchronousServerSocketChannel serverChannel =
                AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(8080));
        serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
            @Override
            public void completed(AsynchronousSocketChannel socketChannel, Object attachment) {
                try {
                    // 打印线程的名字
                    System.out.println("2--"+Thread.currentThread().getName());
                    System.out.println(socketChannel.getRemoteAddress());
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    // socketChannel异步的读取数据到buffer中
                    socketChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
                        @Override
                        public void completed(Integer result, ByteBuffer buffer) {
                            // 打印线程的名字
                            System.out.println("3--"+Thread.currentThread().getName());
                            buffer.flip();
                            System.out.println(new String(buffer.array(), 0, result));
                            socketChannel.write(ByteBuffer.wrap("HelloClient".getBytes()));
                        }
                        @Override
                        public void failed(Throwable exc, ByteBuffer buffer) {
                            exc.printStackTrace();
                        }
                    });
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            @Override
            public void failed(Throwable exc, Object attachment) {
                exc.printStackTrace();
            }
        });
        System.out.println("1--"+Thread.currentThread().getName());
        Thread.sleep(Integer.MAX_VALUE);
    }
}

客户端代码

public class AIOClient {
    private final AsynchronousSocketChannel client;
    public AIOClient() throws IOException {
        client = AsynchronousSocketChannel.open();
    }
    public static void main(String[] args) throws Exception {
        new AIOClient().connect("localhost",8080);
    }
    public void connect(String host, int port) throws Exception {
        // 客户端向服务端发起连接
        client.connect(new InetSocketAddress(host, port), null, new CompletionHandler<Void, Object>() {
            @Override
            public void completed(Void result, Object attachment) {
                try {
                    client.write(ByteBuffer.wrap("这是一条测试数据".getBytes())).get();
                    System.out.println("已发送到服务端");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            @Override
            public void failed(Throwable exc, Object attachment) {
                exc.printStackTrace();
            }
        });
        final ByteBuffer bb = ByteBuffer.allocate(1024);
        // 客户端接收服务端的数据,获取的数据写入到bb中
        client.read(bb, null, new CompletionHandler<Integer, Object>() {
            @Override
            public void completed(Integer result, Object attachment) {
                // 服务端返回数据的长度result
                System.out.println("I/O操作完成:" + result);
                System.out.println("获取反馈结果:" + new String(bb.array()));
            }
            @Override
            public void failed(Throwable exc, Object attachment) {
                exc.printStackTrace();
            }
        });
        try {
            Thread.sleep(Integer.MAX_VALUE);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

5.3 优点和缺点

优势:

缺点:

ps: 说白了AIO很好用,但是太复杂

以上就是通过Java带你了解网络IO模型的详细内容,更多关于Java 网络IO模型的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文