Linux

关注公众号 jb51net

关闭
首页 > 网站技巧 > 服务器 > Linux > Linux其他备选高级IO模型

Linux其他备选高级IO模型用法详解

作者:Truelon

文章介绍了Linux系统中同步与异步I/O模型,包括阻塞I/O、非阻塞I/O、IO多路复用(select/poll/epoll)、信号驱动I/O和异步I/O,重点解析了各模型工作原理、优缺点及适用场景,强调通过非阻塞、多路复用和事件驱动机制提升I/O效率,减少资源消耗,适用于高并发网络服务等场景

其他高级 I/O 模型

以上基本介绍的都是同步IO相关知识点,即在同步I/O模型中,程序发起I/O操作后会等待I/O操作完成,即程序会被阻塞,直到I/O完成。

整个I/O过程在同一个线程中进行,程序在等待期间不能执行其他任务。下面将会介绍除同步IO之外的其他常见IO模型。

什么是IO?什么是高效的IO?

**I/O(输入/输出)**是计算机与外部世界进行数据交换的过程。在Linux系统中,I/O操作通常指的是程序与硬盘、网络设备、终端等进行数据交换的操作。I/O性能直接影响到系统的响应速度和吞吐量,是很多应用系统优化的关键目标。

高效的IO 涉及优化I/O操作的延迟、吞吐量和资源消耗。在一个复杂的系统中,I/O效率通常通过以下方式得到提升:

高效的IO不仅可以提升程序的响应速度,还能减少系统的负载,提高并发处理能力。

IO模型分析方法

出处:Linux五种IO模型

分析IO模型需要了解2个问题:

问题1:发送IO请求,IO请求可以理解为用户空间和内核空间数据同步,根据发起者不同分为以下两种情况:

问题2:等待数据到来,等待数据到来的方式有以下几种:

内核空间和用户空间数据同步由谁发起是分析Linux IO模型最核心的问题

1.阻塞式I/O(Blocking I/O)

代码示例:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>

#define PORT 8080
#define BUFFER_SIZE 1024

int main() {
    int sockfd;
    struct sockaddr_in server_addr;
    char buffer[BUFFER_SIZE];

    // 创建一个TCP套接字
    if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
        perror("套接字创建失败");
        return -1;
    }

    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(PORT);
    server_addr.sin_addr.s_addr = INADDR_ANY;

    // 绑定套接字
    if (bind(sockfd, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
        perror("绑定失败");
        close(sockfd);
        return -1;
    }

    // 监听端口
    if (listen(sockfd, 3) < 0) {
        perror("监听失败");
        close(sockfd);
        return -1;
    }

    printf("等待客户端连接...\n");

    int new_sock;
    struct sockaddr_in client_addr;
    socklen_t client_len = sizeof(client_addr);

    // 接受客户端连接
    if ((new_sock = accept(sockfd, (struct sockaddr*)&client_addr, &client_len)) < 0) {
        perror("接受连接失败");
        close(sockfd);
        return -1;
    }

    printf("客户端已连接。等待数据...\n");

    // 阻塞式recv:等待客户端发送数据
    while (1) {
        printf("发起 I/O 请求:调用 recv() 等待数据...\n");
        ssize_t bytes_received = recv(new_sock, buffer, BUFFER_SIZE - 1, 0); // BUFFER_SIZE -1 保证留出 '\0'

        if (bytes_received < 0) {
            perror("recv 失败");
            break;
        }

        if (bytes_received == 0) {
            printf("客户端已断开连接。\n");
            break;
        }

        buffer[bytes_received] = '\0';  // 确保字符串结束
        printf("收到数据:%s\n", buffer); // 数据到达并唤醒进程
    }

    close(new_sock);
    close(sockfd);
    return 0;
}

原理分析:

阶段 1: 用户程序调用 recv 发起 I/O 请求(同步 I/O)

背景: 在阻塞 I/O 模式下,用户程序发起 I/O 操作时(比如通过 recv 函数从套接字读取数据),如果内核空间的套接字缓冲区没有数据准备好,用户进程会被阻塞,直到数据可用。

步骤 1: 发起 I/O 请求

ssize_t bytes_received = recv(sockfd, buffer, BUFFER_SIZE, 0);

步骤 2: 进程切换状态并阻塞

重要的概念:

阶段 2: 网卡收到数据包并唤醒进程

背景: 数据包通过网络接口卡(NIC)到达时,内核通过硬件中断机制将数据拷贝到内核空间。内核会将这些数据存入相应的套接字缓冲区,进而唤醒之前等待的进程。

步骤 1: 网卡接收数据包

步骤 2: 数据包复制到套接字接收缓冲区

步骤 3: 唤醒等待的进程

步骤 4: 用户程序完成 I/O 操作

2.非阻塞IO(Non-blocking I/O)

首先我们先来认识一个新的函数fcntl():

2.1fcntl函数的基本用法

fcntl 函数主要用于对已打开的文件描述符进行控制操作,以改变文件描述符的属性或获取其状态。

fcntl函数通常在处理低级别的文件I/O时使用,例如复制文件描述符、获取和设置文件描述符标志、获取和设置文件状态标志、文件锁定(共享锁和排他锁)、设置文件所有者。

#include <fcntl.h>
int fcntl(int fd, int cmd, ... /* arg */ );

参数说明

返回值

fcntl的常见命令

复制文件描述符

示例代码:

​
#include <fcntl.h> // 包含 fcntl 相关的头文件
#include <unistd.h> // 包含 close 和 open 函数的头文件
#include <iostream> // 包含输入输出流的头文件

int main() {
    int old_fd = open("example.txt", O_RDONLY); // 打开文件用于只读
    if (old_fd == -1) {
        std::perror("打开文件失败:"); // 输出错误信息
        return -1;
    }

    int new_fd = fcntl(old_fd, F_DUPFD, 3); // 复制文件描述符,要求新的描述符至少为 3
    if (new_fd == -1) {
        std::perror("复制文件描述符失败:");
        close(old_fd);
        return -1;
    }

    std::cout << "新的文件描述符:" << new_fd << std::endl; // 输出新的文件描述符

    close(old_fd); // 关闭旧的文件描述符
    close(new_fd); // 关闭新的文件描述符

    return 0;
}

​

获取/设置文件描述符标志

标志

​
#include <fcntl.h> // 包含 fcntl 相关的头文件
#include <unistd.h> // 包含 close 和 open 函数的头文件
#include <iostream> // 包含输入输出流的头文件

int main() {
    int fd = open("example.txt", O_RDONLY); // 打开文件用于只读
    if (fd == -1) {
        std::perror("打开文件失败:"); // 输出错误信息
        return -1;
    }

    int flags = fcntl(fd, F_GETFD); // 获取文件描述符的标志
    if (flags == -1) {
        std::perror("获取文件描述符标志失败:");
        close(fd);
        return -1;
    }

    // 设置 FD_CLOEXEC 标志,使文件描述符在执行 exec 时关闭
    if (fcntl(fd, F_SETFD, flags | FD_CLOEXEC) == -1) {
        std::perror("获取文件描述符标志失败:");
        close(fd);
        return -1;
    }

    std::cout << "设置 FD_CLOEXEC 前的标志:" << flags << std::endl; // 输出设置前的标志
    std::cout << "设置 FD_CLOEXEC 后的标志:" << new_flags << std::endl; // 输出设置后的标志

    close(fd); // 关闭文件描述符
    return 0;
}

​

获取/设置文件状态标志

​
#include <fcntl.h> // 包含 fcntl 相关的头文件
#include <unistd.h> // 包含 close 和 open 函数的头文件
#include <iostream> // 包含输入输出流的头文件

int main() {
    int fd = open("example.txt", O_RDONLY); // 打开文件用于只读
    if (fd == -1) {
        std::perror("打开文件失败:"); // 输出错误信息
        return -1;
    }

    int flags = fcntl(fd, F_GETFL); // 获取文件状态标志
    if (flags == -1) {
        std::perror("获取文件状态标志失败:");
        close(fd);
        return -1;
    }

    // 设置 O_NONBLOCK 标志,使文件描述符处于非阻塞模式
    fcntl(fd, F_SETFL, flags | O_NONBLOCK);

    int new_flags = fcntl(fd, F_GETFL); // 再次获取文件状态标志
    if (new_flags == -1) {
        std::perror("获取文件状态标志失败:");
        close(fd);
        return -1;
    }

    std::cout << "设置 O_NONBLOCK 前的标志:" << flags << std::endl; // 输出设置前的标志
    std::cout << "设置 O_NONBLOCK 后的标志:" << new_flags << std::endl; // 输出设置后的标志

    close(fd); // 关闭文件描述符

    return 0;
}

​

2.2 阻塞模式 vs 非阻塞模式

阻塞模式:(上面已有介绍)

非阻塞模式

非阻塞I/O模型下,I/O请求立即返回,不会阻塞程序。如果数据没有准备好,程序会收到错误或“没有数据”的通知,之后可以重新尝试。

代码示例:

#include <iostream>
#include <unistd.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
#include <cstring>

using namespace std;
#define PORT 8080
#define BUFFER_SIZE 1024

// 设置套接字为非阻塞模式
int setNonBlocking(int sockfd) {
    int flags = fcntl(sockfd, F_GETFL, 0);
    if (flags == -1) {
        cerr << "获取文件状态标志失败: " << strerror(errno) << endl;
        return -1;
    }
    if (fcntl(sockfd, F_SETFL, flags | O_NONBLOCK) == -1) {
        cerr << "设置非阻塞模式失败: " << strerror(errno) << endl;
        return -1;
    }
    return 0;
}

int main() {
    // 创建套接字
    int server_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (server_fd == -1) {
        cerr << "创建套接字失败: " << strerror(errno) << endl;
        return -1;
    }

    // 设置套接字为非阻塞
    if (setNonBlocking(server_fd) == -1) {
        close(server_fd);
        return -1;
    }

    // 绑定地址和端口
    struct sockaddr_in address;
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
        cerr << "绑定地址和端口失败: " << strerror(errno) << endl;
        close(server_fd);
        return -1;
    }

    // 监听连接
    if (listen(server_fd, 3) < 0) {
        cerr << "监听连接失败: " << strerror(errno) << endl;
        close(server_fd);
        return -1;
    }

    cout << "服务器正在监听端口 " << PORT << endl;

    // 接受客户端连接
    struct sockaddr_in client_address;
    socklen_t addr_len = sizeof(client_address);
    int client_fd = -1;

    while (client_fd == -1) {
        client_fd = accept(server_fd, (struct sockaddr *)&client_address, &addr_len);

        if (client_fd == -1) {
            // 如果没有连接,errno 会被设置为 EWOULDBLOCK 或 EAGAIN
            if (errno == EWOULDBLOCK || errno == EAGAIN) {
                cout << "没有可用连接,继续等待..." << endl;
                sleep(1);  // 延时 1s 后重试
            } else {
                cerr << "接受客户端连接失败: " << strerror(errno) << endl;
                close(server_fd);
                return -1;
            }
        }
    }

    cout << "接受到客户端连接" << endl;

    // 设置客户端套接字为非阻塞
    if (setNonBlocking(client_fd) == -1) {
        close(server_fd);
        close(client_fd);
        return -1;
    }

    char buffer[BUFFER_SIZE];
    int bytes_read;

    while (true) {
        // 尝试读取客户端数据
        bytes_read = recv(client_fd, buffer, sizeof(buffer), 0);

        if (bytes_read == -1) {
            // 如果没有数据,errno 会被设置为 EWOULDBLOCK 或 EAGAIN
            if (errno == EWOULDBLOCK || errno == EAGAIN) {
                cout << "没有可用数据,执行其他任务..." << endl;
                usleep(100000);  // 假设其他任务的延时(100ms)
            } else {
                cerr << "接收数据失败: " << strerror(errno) << endl;
                break;
            }
        } else if (bytes_read == 0) {
            // 客户端关闭连接
            cout << "客户端已断开连接" << endl;
            break;
        } else {
            // 成功接收到数据
            buffer[bytes_read] = '\0';  // 确保接收到的字符串以 '\0' 结尾
            cout << "收到数据: " << buffer << endl;
        }
    }

    // 关闭连接
    close(client_fd);
    close(server_fd);

    return 0;
}

原理分析:

非阻塞 I/O/阶段1:用户程序调用 recv 发起 I/O 请求

非阻塞 I/O/阶段2:数据到达并唤醒进程

拓展知识Ctrl+CCtrl+D:

组合键作用退出状态码典型场景
Ctrl+C发送 SIGINT 信号给前台进程,请求程序终止通常为 130(128 + 2)用于中断正在运行的程序
Ctrl+D发送 EOF 信号,表示输入结束通常为 0用于结束输入或退出交互式 shell

3.IO多路复用(Multiplexing)

IO多路复用是一种高效的IO处理方式,它可以让一个进程同时监控多个文件描述符,当其中任意一个文件描述符就绪时,就可以进行相应的IO操作。

相比于传统的阻塞IO和非阻塞IO,IO复用可以打打提高IO效率,减少CPU资源的浪费。

在Linux中,常用的IO复用模型有selectpollepoll等。

IO复用模型请求由用户程序发起,所以IO复用模型为同步IO。

3.1 IO复用select模型

3.1.1 认识select函数

系统调用select()会一直阻塞,直到一个或多个文件描述符集合成为就绪态。

它通过传递三个文件描述符集合(读集合、写集合和异常集合)给内核,内核会在这些集合中等待任意一个文件描述符就绪。

#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

参数:

该参数可指定为NULL,此时select()会一直阻塞。又或者指向一个timeval结构体。

如果结构体timeval的两个域都为0的话,此时select()不会阻塞。它只是简单低轮询指定的文件描述符集合,查看其中是否有就绪的文件描述符并立即返回。

否则,timeval的两个域都不为0的话,timeval将为select()指定一个等待时间上限值。代表多久之后返回,即超时时间。

返回值:

timeval结构体原型

struct timeval
{
__time_t tv_sec;		/* 秒.  */
__suseconds_t tv_usec;	/* 微秒.  */
};

timeout设为NULL,或其指向的结构体字段非零时,select()将阻塞直到下列事件发生:

关于 fd_setfd_set 是一个位图结构,用于标识一组文件描述符(通常是网络套接字)。fd_set 的定义通常是由底层实现细节决定的,但通常它是一个足够大的位字段,能够容纳系统中可能的最大文件描述符值。fd_set 的主要用途包括:

3.1.2fd_set类型和宏

通常数据类型fd_set以位掩码的形式来实现。但是,我们并不需要知道这些细节,因为所有关于文件描述符集合的操作都是通过四个宏完成的:

#include <sys/select.h>
void FD_ZERO(fd_set *set);			/* 将fdset所指向的集合初始化为空 */
void FD_SET(int fd, fd_set *set);	/* 将文件描述符fd添加到由fdset所指向的集合中 */
void FD_CLR(int fd, fd_set *set);	/* 将文件描述符fd从fdset所指向的集合中移除 */
int FD_ISSET(int fd, fd_set *set);	/* 检查已连接的套接字是否有数据可读 */

参数readfdswritefdsexceptfds所指向的结构体都是保存结果值的地方。

在调用select()之前,这些指向的结构体必须初始化(通过FD_ZERO()FD_SET()),以包含我们感兴趣的文件描述符集合。

之后select()调用会修改这些结构体,当select()返回时,它们包含的就是已处于就绪态的文件描述符集合了(由于这些结构体会在调用中被修改,如果在循环中重复调用select(),我们必须保证每次都要重新初始化它们)。

之后这些结构体可以通过FD_ISSET()来检查是否有数据可读。

3.1.3 select特点

文件描述符上限:

维护合法的文件描述符:

输入输出型参数:

第一个参数是最大 fd+1

位图的使用:

代码示例:

#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <cstring>
#include <fcntl.h>
#include <sys/select.h>

#define SERVER_PORT 8080
#define MAX_CLIENTS 10
#define BUF_SIZE 1024

void set_socket_non_blocking(int sockfd) {
    int flags = fcntl(sockfd, F_GETFL, 0);
    fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);
}

int main() {
    int server_fd, client_fd, max_fd, new_socket;
    struct sockaddr_in server_addr;
    char buffer[BUF_SIZE];
    fd_set read_fds, master_fds;
    struct timeval timeout;

    // 创建 TCP socket
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
        perror("创建套接字失败");
        return -1;
    }

    // 设置服务器地址
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = INADDR_ANY;
    server_addr.sin_port = htons(SERVER_PORT);

    // 绑定套接字
    if (bind(server_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
        perror("绑定失败");
        return -1;
    }

    // 开始监听客户端连接
    if (listen(server_fd, MAX_CLIENTS) == -1) {
        perror("监听失败");
        return -1;
    }

    // 设置 server_fd 为非阻塞
    set_socket_non_blocking(server_fd);

    // 初始化 fd_set
    FD_ZERO(&master_fds);
    FD_SET(server_fd, &master_fds);
    max_fd = server_fd;

    while (true) {
        // 将 master_fds 赋值给 read_fds,因为 select() 会修改 read_fds
        read_fds = master_fds;

        // 设置超时,阻塞 5 秒
        timeout.tv_sec = 5;
        timeout.tv_usec = 0;

        // 调用 select() 进行 I/O 多路复用
        int activity = select(max_fd + 1, &read_fds, nullptr, nullptr, &timeout);
        if (activity == -1) {
            perror("select 错误");
            break;
        } else if (activity == 0) {
            std::cout << "没有活动,继续等待...\n";
            continue;
        }

        // 遍历所有文件描述符
        for (int fd = 0; fd <= max_fd; ++fd) {
            // 如果 fd 是活动的文件描述符
            if (FD_ISSET(fd, &read_fds)) {
                if (fd == server_fd) {
                    // 处理新的客户端连接
                    if ((new_socket = accept(server_fd, nullptr, nullptr)) == -1) {
                        perror("接受连接失败,继续...");
                        continue;
                    }

                    std::cout << "新连接,套接字 fd: " << new_socket << "\n";

                    // 设置新的套接字为非阻塞
                    set_socket_non_blocking(new_socket);

                    // 将新的客户端套接字加入 fd_set
                    FD_SET(new_socket, &master_fds);
                    if (new_socket > max_fd) {
                        max_fd = new_socket;
                    }
                } else {
                    // 处理已连接客户端的 I/O 操作
                    int bytes_read = read(fd, buffer, sizeof(buffer) - 1);
                    if (bytes_read == 0) {
                        // 客户端关闭连接
                        std::cout << "客户端断开连接,套接字 fd: " << fd << "\n";
                        close(fd);
                        FD_CLR(fd, &master_fds);
                    } else if (bytes_read > 0) {
                        // 处理收到的数据
                        buffer[bytes_read] = '\0';
                        std::cout << "收到来自客户端 " << fd << " 的数据: " << buffer << "\n";

                        // 回送数据给客户端
                        send(fd, buffer, bytes_read, 0);
                    } else {
                        perror("读取错误");
                        close(fd);
                        FD_CLR(fd, &master_fds);
                    }
                }
            }
        }
    }

    close(server_fd);
    return 0;
}

原理分析:

3.2 IO复用poll模型

poll是一种改进的 IO 多路复用模型,它解决了select模型中的一些局限性,尤其是在处理大量文件描述符和提高性能方面。

pollselect类似,但它通过使用pollfd结构体数组来管理文件描述符,而不是像select那样依赖位图(bitmask)。这使得poll在某些方面更加灵活和高效。

3.2.1poll的基本使用方法

#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);

参数说明

pollfd 结构体:

struct pollfd {
    int   fd;         // 要监视的文件描述符
    short events;     // 监视的事件
    short revents;    // 实际发生的事件
};

事件类型常用值:

3.2.2poll解决select存在的问题

poll相较于select在多个方面进行了改进,解决了select模型中的一些关键限制。以下是poll解决select存在问题的主要方面:

  1. 无需重新设定参数:与select每次调用前都需要重新初始化监视的文件描述符集合不同,poll使用一个独立的结构体数组(pollfd),该数组在函数调用后保持不变,只需更新事件结果。这使得代码更简洁、易于维护。
  2. 消除文件描述符数量的上限select受限于系统定义的最大文件描述符数(如FD_SETSIZE),而poll通过动态管理文件描述符数组,仅受系统资源和内核能力的限制,适合处理大量并发连接。
  3. 更高效的事件通知机制:尽管两者都可能需要扫描所有文件描述符,poll得益于其灵活的结构和操作系统的优化,在实际应用中通常性能更优。
  4. 支持更多事件类型:相较于select仅支持基本事件类型,poll支持更多种类的事件,如高优先级数据和挂断事件,提供更细致的监控能力。
  5. 更好的跨平台兼容性poll在多种现代操作系统中的实现更为一致,简化了跨平台应用的开发难度。

代码示例:

#include <iostream>
#include <vector>
#include <poll.h>
#include <unistd.h>
#include <cstring>
#include <arpa/inet.h>
#include <fcntl.h>

#define PORT 8080
#define MAX_EVENTS 1024
#define BUFFER_SIZE 1024

int set_non_blocking(int fd) {
    int flags = fcntl(fd, F_GETFL, 0);
    if (flags == -1)
        return -1;
    return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}

int main() {
    int server_fd, new_socket;
    struct sockaddr_in address;
    int opt = 1;
    int addrlen = sizeof(address);
    
    // 创建监听套接字
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }
    
    // 设置套接字选项
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt))) {
        perror("setsockopt");
        close(server_fd);
        exit(EXIT_FAILURE);
    }
    
    // 设置地址结构
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);
    
    // 绑定套接字
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
        perror("bind failed");
        close(server_fd);
        exit(EXIT_FAILURE);
    }
    
    // 开始监听
    if (listen(server_fd, 10) < 0) {
        perror("listen");
        close(server_fd);
        exit(EXIT_FAILURE);
    }
    
    // 设置服务器套接字为非阻塞
    if (set_non_blocking(server_fd) < 0) {
        perror("set_non_blocking");
        close(server_fd);
        exit(EXIT_FAILURE);
    }
    
    // 初始化pollfd数组
    std::vector<struct pollfd> fds;
    struct pollfd server_pollfd;
    server_pollfd.fd = server_fd;
    server_pollfd.events = POLLIN;
    server_pollfd.revents = 0;
    fds.push_back(server_pollfd);
    
    std::cout << "服务器正在端口 " << PORT << " 上监听" << std::endl;
    
    while (true) {
        int activity = poll(fds.data(), fds.size(), -1);
        
        if (activity < 0) {
            perror("poll error");
            break;
        }
        
        for (size_t i = 0; i < fds.size(); ++i) {
            // 检测是否有事件发生
            if (fds[i].revents & POLLIN) {
                if (fds[i].fd == server_fd) {
                    // 有新的连接请求
                    if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen)) < 0) {
                        perror("accept");
                        continue;
                    }
                    
                    // 设置新套接字为非阻塞
                    if (set_non_blocking(new_socket) < 0) {
                        perror("set_non_blocking");
                        close(new_socket);
                        continue;
                    }
                    
                    // 添加新套接字到pollfd数组
                    struct pollfd client_pollfd;
                    client_pollfd.fd = new_socket;
                    client_pollfd.events = POLLIN;
                    client_pollfd.revents = 0;
                    fds.push_back(client_pollfd);
                    
                    std::cout << "新连接,socket fd: " << new_socket << std::endl;
                } else {
                    // 处理已连接的客户端数据
                    char buffer[BUFFER_SIZE];
                    int valread = read(fds[i].fd, buffer, BUFFER_SIZE);
                    
                    if (valread <= 0) {
                        // 客户端关闭连接或发生错误
                        close(fds[i].fd);
                        std::cout << "连接关闭,socket fd: " << fds[i].fd << std::endl;
                        fds.erase(fds.begin() + i);
                        --i;
                        continue;
                    }
                    
                    buffer[valread] = '\0';
                    std::cout << "收到: " << buffer << " 来自socket fd: " << fds[i].fd << std::endl;
                    
                    // 回显数据给客户端
                    send(fds[i].fd, buffer, valread, 0);
                }
            }
        }
    }
    
    // 关闭所有套接字
    for (auto &pfd : fds) {
        close(pfd.fd);
    }
    
    return 0;
}

原理分析:

poll模型和select非常相似,主要区别为poll模型把位图改为链表,poll通过链表实现IO复用,将 socket 注册 poll_list 链表,通过poll系统调用轮询链表,获取 socket 事件。

成功获取到 socket 事件后,poll成功返回,此时可以通过接收函数读取 socket 缓冲区数据。

3.3 IO复用epoll模型

epoll是Linux下高效的IO多路复用机制,相较于selectpollepoll在处理大量并发连接时具有更好的性能和扩展性。

epoll的工作机制主要包括下面几个步骤:

3.3.1 epoll的基本使用方法

使用 epoll_createepoll_create1 系统调用创建一个 epoll 实例,该调用返回一个 epoll 文件描述符(epoll_fd)。这个文件描述符用于后续的事件注册和事件等待。

#include <sys/epoll.h>
int epoll_create1(int flags);

参数:

注册感兴趣的事件

#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll_ctlepoll API 的核心组成部分之一,它用于控制 epoll 实例中的文件描述符集合。通过这个函数,可以向 epoll 实例中添加、修改或删除感兴趣的文件描述符及其对应的事件类型。

使用 epoll_ctl 系统调用向 epoll 实例中注册需要监控的文件描述符及其感兴趣的事件类型(如可读、可写等)

参数

op: 操作类型,可以是以下三种之一:

返回值:成功时返回 0;失败时返回 -1 并设置相应的 errno 错误码。

事件结构体 epoll_event

struct epoll_event {
    uint32_t events;    /* Epoll events */
    epoll_data_t data;  /* User data variable */
};

typedef union epoll_data {
    void *ptr;
    int fd;
    uint32_t u32;
    uint64_t u64;
} epoll_data_t;

在使用 epoll_ctl 之前,我们需要定义一个 epoll_event 结构体来描述要监控的事件和关联的数据。这个结构体有两个主要成员:

events:这是一个位掩码,指定了我们对文件描述符感兴趣的事件类型。

常见的事件类型包括:

data:这是一个联合体,通常用来存储与该文件描述符相关的用户数据。最常见的做法是使用 data.fd 来存储文件描述符本身,以便在事件发生时能够快速识别出哪个描述符触发了事件。

示例代码

struct epoll_event event;
event.events = EPOLLIN | EPOLLET; 	// 监听可读事件,采用边缘触发
event.data.fd = sock_fd;			// 将sock_fd绑定到event.data.fd
// 使用EPOLL_CTL_ADD操作将sock_fd加入epoll实例
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock_fd, &event) == -1) {
    perror("epoll_ctl: add");
    exit(EXIT_FAILURE);
}

等待事件的发生

使用 epoll_wait 系统调用阻塞等待注册的事件发生。当有事件就绪时,epoll_wait 返回就绪事件的数量,并提供epoll_event数组中填充准备好的事件信息。

#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

参数

返回值:成功时返回已就绪的文件描述符的数量;如果没有事件发生且超时到期,则返回 0;出错时返回 -1 并设置 errno

struct epoll_event events[MAX_EVENTS];
int n = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
if (n == -1) {
    perror("epoll_wait");
    exit(EXIT_FAILURE);
}
for (int i = 0; i < n; i++) {
    if (events[i].events & EPOLLIN) {
        // 处理可读事件
    }
    // 处理其他事件类型
}

处理事件

根据就绪事件的类型,执行相应的 I/O 操作,如读取数据、写入数据或关闭连接等。在边缘触发模式下,必须一次性将所有可读或可写的数据处理完毕,否则可能会错过后续的事件通知。

关闭 epoll 实例

当不再需要 epoll 实例时,使用 close 关闭 epoll 文件描述符,释放资源。

close(epoll_fd);

3.3.2 epoll原理

3.3.3 水平触发(LT)模式 vs 边缘触发(ET)模式

epoll提供了两种主要的事件触发模式:水平触发(LT) vs 边缘触发(ET)。它们决定了在文件描述符的状态发生变化时,内核如何通知应用程序。理解这两种触发的特点及其使用场景,对高效使用epoll至关重要。

工作原理

水平触发和边缘触发的比较

特性水平触发(LT)边缘触发(ET)
通知次数只要事件未被处理完,内核会持续通知。只会在文件描述符的状态发生变化时通知一次。
对阻塞 I/O 的要求不要求,I/O 可以是阻塞的。必须使用非阻塞 I/O,防止错过后续事件。
复杂度实现简单,事件处理较为直观。需要更复杂的事件处理逻辑,必须一次性处理所有数据。
资源消耗可能会造成不必要的重复通知,增加 CPU 占用。更加高效,减少了重复的事件通知。
适用场景普通的 I/O 密集型应用,如文件处理、轻量级的网络服务等。高并发、高性能的网络应用,如 Web 服务器、实时流处理等。

边缘触发(ET)模式的优化技巧

由于 ET 模式要求事件处理更为高效,一些优化策略可以确保高并发下的稳定运行:

非阻塞 I/O

必须将所有受 epoll 监控的文件描述符设置为非阻塞模式。否则,在数据未及时读取或写入的情况下,ET 模式可能错过后续事件通知,导致程序不响应新事件。

int flags = fcntl(sock_fd, F_GETFL, 0);
fcntl(sock_fd, F_SETFL, flags | O_NONBLOCK);

一次性处理所有数据

在 ET 模式下,应用程序必须确保在收到事件通知时一次性处理所有可读或可写的数据。如果不处理完所有数据,下一次数据到达时 epoll 可能不会再次通知,从而导致数据丢失。

例如,对于一个可读事件的套接字,应该不断调用 read()recv() 来读取所有数据,直到 read() 返回 EAGAINEWOULDBLOCK,表示数据已读取完毕。

ssize_t bytes_read;
while ((bytes_read = read(sock_fd, buffer, sizeof(buffer))) > 0) {
    // 处理数据
}
if (bytes_read < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
    perror("read failed");
    // 错误处理
}

处理数据时避免阻塞

ET 模式通常与非阻塞 I/O 一起使用,因此处理事件时要特别小心。确保每次操作不会阻塞整个进程,避免应用程序挂起。

合理选择超时值

如果你的应用程序不需要实时响应,也可以考虑使用超时等待(epoll_wait 的第四个参数设置为适当的超时值),这样可以避免应用程序因过于频繁的调用 epoll_wait 而浪费 CPU 时间。

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#define MAX_EVENTS 10
#define PORT 8080
#define BUFFER_SIZE 1024

// 设置文件描述符为非阻塞
int set_nonblocking(int fd) {
    int flags = fcntl(fd, F_GETFL, 0);
    if (flags == -1) {
        perror("fcntl(F_GETFL)");
        return -1;
    }
    if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
        perror("fcntl(F_SETFL)");
        return -1;
    }
    return 0;
}

// 处理客户端数据的函数
void handle_client_data(int client_fd) {
    char buffer[BUFFER_SIZE];
    ssize_t bytes_read = read(client_fd, buffer, sizeof(buffer));
    if (bytes_read == -1) {
        if (errno != EAGAIN && errno != EWOULDBLOCK) {
            perror("read failed");
            close(client_fd);
        }
    } else if (bytes_read == 0) {
        // 客户端关闭连接
        printf("Client disconnected\n");
        close(client_fd);
    } else {
        // 处理读取的数据
        buffer[bytes_read] = '\0';
        printf("Received data: %s\n", buffer);

        // 发送数据到客户端
        ssize_t bytes_written = write(client_fd, buffer, bytes_read);
        if (bytes_written == -1) {
            perror("write failed");
            close(client_fd);
        }
    }
}

int main() {
    int server_fd, epoll_fd;
    struct sockaddr_in server_addr;
    struct epoll_event ev, events[MAX_EVENTS];

    // 创建监听套接字
    server_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (server_fd == -1) {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }

    // 设置服务器地址
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = INADDR_ANY;
    server_addr.sin_port = htons(PORT);

    // 绑定套接字
    if (bind(server_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) {
        perror("bind failed");
        close(server_fd);
        exit(EXIT_FAILURE);
    }

    // 监听连接
    if (listen(server_fd, 10) == -1) {
        perror("listen failed");
        close(server_fd);
        exit(EXIT_FAILURE);
    }

    // 创建 epoll 实例
    epoll_fd = epoll_create1(0);
    if (epoll_fd == -1) {
        perror("epoll_create1 failed");
        close(server_fd);
        exit(EXIT_FAILURE);
    }

    // 将服务器套接字加入 epoll 事件监听
    ev.events = EPOLLIN | EPOLLET;  // 监听可读事件,采用边缘触发模式
    ev.data.fd = server_fd;
    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev) == -1) {
        perror("epoll_ctl failed");
        close(server_fd);
        close(epoll_fd);
        exit(EXIT_FAILURE);
    }

    // 主事件循环
    while (1) {
        int num_events = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
        if (num_events == -1) {
            perror("epoll_wait failed");
            break;
        }

        // 处理就绪事件
        for (int i = 0; i < num_events; i++) {
            if (events[i].data.fd == server_fd) {
                // 服务器套接字有连接请求
                int client_fd = accept(server_fd, NULL, NULL);
                if (client_fd == -1) {
                    perror("accept failed");
                    continue;
                }

                // 设置客户端套接字为非阻塞
                if (set_nonblocking(client_fd) == -1) {
                    close(client_fd);
                    continue;
                }

                // 将客户端套接字添加到 epoll 中
                ev.events = EPOLLIN | EPOLLET;  // 监听客户端的可读事件,采用边缘触发模式
                ev.data.fd = client_fd;
                if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &ev) == -1) {
                    perror("epoll_ctl: add client failed");
                    close(client_fd);
                    continue;
                }

                printf("New client connected\n");

            } else if (events[i].events & EPOLLIN) {
                // 客户端有数据可读
                handle_client_data(events[i].data.fd);
            }
        }
    }

    // 清理资源
    close(server_fd);
    close(epoll_fd);
    return 0;
}

原理分析:

epoll 是一种高效的 I/O 事件通知机制,它通过以下方式提高效率:

  1. 红黑树管理文件描述符:使用 epoll_ctl 系统调用将感兴趣的 socket(或其他文件描述符)注册到 epoll 实例中。这些描述符被存储在一个内部的红黑树结构中,这使得添加、删除和查找操作都非常高效。
  2. 就绪队列记录活动事件:当某个 socket 上有数据到达时,内核会自动将该 socket 标记为就绪,并将其加入到一个就绪队列中。这里并没有使用回调函数,而是依赖于内核的通知机制。
  3. 事件通知而非轮询:与 selectpoll 不同,epoll 使用的是事件驱动的通知机制。这意味着只有当有实际事件发生时(如可读或可写),epoll_wait 才会返回并告知应用程序哪些 socket 已准备好进行 I/O 操作。这样避免了对所有文件描述符的重复检查,提高了效率。
  4. epoll_wait 获取事件:应用程序调用 epoll_wait 来等待事件的发生。一旦有事件发生,epoll_wait 就会返回一个包含所有就绪事件的列表,供应用程序处理。

4.信号驱动式IO

信号驱动式IO是Linux提供的一种IO模型,通过信号通知应用程序某个文件描述符的状态发生变化,适用于需要非阻塞IO操作的场景。它利用信号机制将内核的事件通知传递给用户空间,使得程序无需主动伦旭即可对IO事件做出响应。

4.1 工作原理

  1. 信号和信号处理:
  1. 非阻塞模式:
  1. 信号处理函数:

4.2 为什么使用信号驱动IO

  1. **避免轮询:**传统的非阻塞I/O通常需要通过 selectpollepoll 等方式轮询文件描述符,检查是否有数据可以读取。信号驱动I/O消除了这种轮询机制,避免了CPU时间的浪费。
  2. **减少阻塞:**通过信号驱动I/O,应用程序不再需要阻塞等待I/O事件发生,而是通过信号触发事件,从而使得程序可以在等待I/O的同时执行其他任务。
  3. **资源高效:**信号驱动I/O能够实现资源的高效利用,因为它允许应用程序在I/O事件到达时立即处理,而不需要检查文件描述符状态。
  4. **适用于实时应用:**对于一些需要及时响应的实时应用,信号驱动I/O提供了一种快速响应数据的方式。

4.3 实现流程

信号驱动式I/O的核心思想是预先告知内核当某个描述符准备发生某件事情(如数据到达)时发送一个信号(SIGIO)给进程。这使得进程可以在等待数据的过程中不被阻塞,只有在接收到SIGIO信号后才去处理I/O事件。程序需要按照如下步骤执行:

fcntl(fd, F_SETOWN , pid);
flags = fcntl(fd, F_GETFL);  // get current flags
fcntl(fd, F_SETFL, flags | O_ASYNC | O_NONBLOCK);

示例代码及图解:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>

#define PORT 8080
#define MAXLINE 1024

static int sockfd;  // 监听套接字
static struct sockaddr_in cli_addr;
static socklen_t clilen = sizeof(cli_addr);

// 信号处理函数
void do_sometime(int signal) {
    char buffer[MAXLINE] = {0};
    int len = recvfrom(sockfd, buffer, MAXLINE, 0, (struct sockaddr *)&cli_addr, (socklen_t*)&clilen);
    if (len > 0) {
        printf("收到客户端消息: %s\n", buffer);
        strcat(buffer, "→[Msg]");
        sendto(sockfd, buffer, sizeof(buffer), 0, (struct sockaddr *)&cli_addr, clilen);  // 回显消息
    } else {
        printf("没有收到数据或出现错误\n");
    }
}

int main(int argc, char const *argv[]) {
    // 创建套接字
    if ((sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
        perror("socket creation failed");
        exit(EXIT_FAILURE);
    }
    
    // 注册信号处理函数
    struct sigaction act;
    act.sa_handler = do_sometime;
    sigemptyset(&act.sa_mask);
    act.sa_flags = SA_RESTART;  // 重新启动被信号中断的系统调用
    sigaction(SIGIO, &act, NULL);

    // 创建并初始化地址结构
    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(PORT);
    servaddr.sin_addr.s_addr = INADDR_ANY;
    
    // 设置文件描述符的拥有者为当前进程
    fcntl(sockfd, F_SETOWN, getpid());
    int flags = fcntl(sockfd, F_GETFL, 0);
    // 启用信号驱动模式 | 设置文件描述符为非阻塞模式
    fcntl(sockfd, F_SETFL, flags | O_ASYNC | O_NONBLOCK);
    
    // 绑定地址
    if (bind(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
        perror("bind failed");
        close(sockfd);
        exit(EXIT_FAILURE);
    }

    while (1)
        sleep(1);   // 等待信号

    close(sockfd);
    return 0;
}

异步IO通知的工作流程

  1. 注册异步通知

在用户空间,应用程序通过 fcntl 系统调用来设置文件描述符的异步通知机制:

  1. 内核空间处理

在内核空间,驱动程序需要支持异步通知机制。具体步骤如下:

  1. 触发事件

当驱动程序检测到某个事件(如数据到达)时,它会调用 kill_fasync 函数来通知所有注册了异步通知的进程。kill_fasync 函数会遍历 fasync_struct 队列,并向每个进程发送 SIGIO 信号。

  1. 处理信号

用户空间的应用程序收到 SIGIO 信号后,可以在其信号处理函数中执行相应的操作。例如,处理接收到的数据或进行其他必要的操作。

5.异步IO

异步IO(Asynchronous IO,AIO)是一种处理输入/输出操作的方式,它允许程序在发起IO操作后立即返回,而不是等待操作完成。

这种方式可以显著提高应用程序的并发性和吞吐量,特别是在IO密集型的应用场景中。下面将从多个角度深入探讨异步IO的概念,实现机制及其应用场景。

5.1 异步I/O的基本概念

5.1.1 同步 vs 异步

5.1.2 阻塞 vs 非阻塞

虽然“异步”和“非阻塞”听起来相似,但它们实际上是不同的概念。异步I/O指的是整个操作由内核而非用户进程来完成,并且在完成后通知用户进程;而非阻塞I/O则是指用户进程可以在没有数据可读/写时不被阻塞,但仍需主动轮询检查状态

5.1.3 异步IO的工作原理

异步 I/O 的实现通常依赖于以下几个关键组件:

事件通知机制:内核提供一种机制,能够在 I/O 操作完成时通知应用程序。常见的事件通知机制有:

文件描述符和 I/O 操作:文件描述符是 I/O 操作的基础。内核会根据文件描述符的状态来决定 I/O 操作是否可以完成,并在完成时通知程序。

非阻塞模式:异步 I/O 需要文件描述符处于非阻塞模式。通过设置 O_NONBLOCK 或其他标志,程序可以立即返回,而不是等待 I/O 完成。

POSIX AIO 示例:

POSIX AIO 是 Linux 提供的一套接口,用于执行异步 I/O 操作。

在异步 I/O 中,程序可以发起 I/O 操作(如 aio_readaio_write),然后继续执行其他任务,而不是等待 I/O 操作完成。

当 I/O 操作完成时,程序可以通过轮询、信号或回调来获取结果。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <aio.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <errno.h>

#define PORT 8080
#define BUFFER_SIZE 1024

int main() {
    // 创建 TCP 套接字
    int server_fd = socket(AF_INET, SOCK_STREAM, 0);

    // 设置服务器地址结构
    struct sockaddr_in server_addr;
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = INADDR_ANY;
    server_addr.sin_port = htons(PORT);

    // 绑定地址
    bind(server_fd, (struct sockaddr *)&server_addr, sizeof(server_addr));

    // 监听连接
    listen(server_fd, 5);

    printf("Server listening on port %d...\n", PORT);

    // 接受客户端连接
    struct sockaddr_in client_addr;
    socklen_t client_len = sizeof(client_addr);
    int client_fd = accept(server_fd, (struct sockaddr *)&client_addr, &client_len);

    // 设置异步读操作
    char buffer[BUFFER_SIZE];
    struct aiocb aio_read_cb;
    memset(&aio_read_cb, 0, sizeof(struct aiocb));
    aio_read_cb.aio_fildes = client_fd;
    aio_read_cb.aio_buf = buffer;
    aio_read_cb.aio_nbytes = sizeof(buffer);
    aio_read_cb.aio_offset = 0;

    // 发起异步读操作
    aio_read(&aio_read_cb);

    // 等待读操作完成
    while (aio_error(&aio_read_cb) == EINPROGRESS) {
        // 可以执行其他操作
        usleep(10000);  // 等待10ms
    }

    // 读取完成,获取结果
    int bytes_read = aio_return(&aio_read_cb);

    printf("Received message: %s\n", buffer);

    // 发送数据到客户端
    send(client_fd, buffer, bytes_read, 0);

    close(client_fd);
    close(server_fd);
    return 0;
}

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文