服务器其它

关注公众号 jb51net

关闭
首页 > 网站技巧 > 服务器 > 服务器其它 > epoll Reactor服务器

基于epoll实现 Reactor服务器的详细过程

作者:云的小站

在我们调用epoll_create的时候会创建出epoll模型,这个模型也是利用文件描述类似文件系统的方式控制该结构,这篇文章主要介绍了基于epoll实现 Reactor服务器的详细过程,需要的朋友可以参考下

了解epoll底层逻辑

在我们调用epoll_create的时候会创建出epoll模型,这个模型也是利用文件描述类似文件系统的方式控制该结构。

在我们调用epoll_create的时候,就会在内核管理中创建一个epoll模型,并且建管理模块地址给file结构体,file结构体也是连接在管理所有file结构体的数据结构中

所以epoll也会给进程管理的files返回一个file地址保存在file_array中,并且将该地址在array中的下标值返回给上层。

这样以同一的方式管理epoll模型。所以这就是epoll模型的好处,这和select和poll的方式不同,这两种并不使用文件描述符

select还需要自己维护一个关心事件的fd的数组,然后再select结束以后,遍历该数组中的fd和输入输出型参数fd_set做查询关系(FD_ISSET),这其实是非常不方便的,在发生事件我们都需要遍历全部关心的事件,查看事件是否发生。并且因为是输入输出型(fd_set)参数,在响应后,之前设置的监视事件失效,所以每次监视事件前,都需要重新输入所有需要监听的事件这是非常不方便的事情

poll在select上做了升级,不再需要额外的数组保存而是使用pollfd结构体保存fd和关心事件,但是在响应后我们依旧需要遍历所有关心的事件,假设1w个被监控的事件只有1个得到了响应,我们却需要遍历1w个事件一个一个检查是否响应,这是低效的算法。

并且在操作系统中poll和epoll搭建的服务器关心的事件会被一直遍历查询是否被响应,哪怕1w个关心事件只有一个响应是第一个,剩下的9999个事件我们也得查看其是否被响应。

我们不应该在响应得到后遍历所有的事件,操作系统也应该轮询的检查所有监控事件被响应,这是低效的2个做法,这就是epoll出现的意义,他的出现解决了这些繁杂的问题,并且在接口使用上做了极大的优化。他利用红黑树来管理需要监视程序员需要关心的事件和利用准备队列构建另一个结构,该结构保存了本次等待得到的所有有响应的事件。

epoll模型介绍

创建epoll模型:调用epoll_create,在文件描述符表添加一个描述符,生成对应的文件结构体结构体保存对应生成eventpoll结构体的地址,该结构中有rbr(监视事件红黑树),rdllist(就绪事件队列)等等。        

添加一个fd到epoll中:调用epoll_ctl,通过epollfd在进程文件描述符表中找到对应的file,然后在对应的文件结构体中的标识符将特定指针强转为eventpoll,访问rbr,增加新结点在树中,并且添加对应的回调函数到对应fd的文件结构体中。

接收并读取报文:网卡设备得到数据,发送设备中断给cpu,cpu根据接收到的中断号,在中断向量表中查找设备驱动提供的接口回调,将数据从网卡中读取到OS层的file文件结构体中,然后经过部分协议解析到TCP解析后,根据端口找到对应的进程,在进程中依靠五元组和fd的映射关系找到对应的file结构体,然后将网卡file的数据拷贝到对应服务器链接的file下的缓冲区中,并且调用其传入的callback函数传入fd通知epoll模型,有数据来临。这个时候我们的epoll在自己的rb树中依靠fd找到对应结点,并且其是否是自己所关心的事件,找到并且是我们的事件,就会取出其rb中的fd和响应的事件做拼接(一个结点监视一个fd的多个事件,发生响应并不是发生全部响应,一般都是一个响应,这个时候就需要将响应的事件和fd做结合,而不是全部事件和fd做结合)构建ready结点反应给上层。

诚然在我们放入事件和拿出响应事件的过程中并不是原子的查找,比如访问ready结点操作系统可能在构建,而我们在拿出,这里就会造成执行流混乱的局面,所以这里是需要进程锁的,保证执行流正常。

庆幸的是,我们的设计者大佬们已将帮我们锁好了,我们用就好了。

LT和ET的区别

LT的工作模式:

ET的工作模式:

二者对比

ps:使用 ET 模式的 epoll, 需要将文件描述设置为非阻塞. 这个不是接口上的要求, 而是 "工程实践" 上的要求,毕竟我们需要一次性读取全部数据,在最后一次不能读取的时候会阻塞在接口处。

插件组合

创建多个类:Epoll类、Sock类、Connection类、Log类

Epoll类

用来为我们保存并管理epoll模型。

static const unsigned int epoll_event_size_default = 64;
class Epoll
{
public:
    Epoll(unsigned int epoll_event_size = epoll_event_size_default)
        : _epoll_event_size(epoll_event_size)
    {
        _epoll_fd = epoll_create(254);
        if (_epoll_fd == -1)
        {
            Log()(Fatal, "epoll_create fail:");
            exit(-1);
        }
        _epoll_event = new epoll_event[_epoll_event_size];
    }
    struct epoll_event *bind_ready_ptr()
    {
        return _epoll_event;
    }
    int EpollCtl(int op, int fd, int event)
    {
        struct epoll_event ev;
        ev.data.fd = fd;
        ev.events = event;
        int status = epoll_ctl(_epoll_fd, op, fd, &ev);
        return status == 0;
    }
    int EpollWait(int timeout)
    {
        int n = epoll_wait(_epoll_fd, _epoll_event, _epoll_event_size, timeout);
        return n;
    }
    int fds_numb()
    {
        return _epoll_event_size;
    }
private:
    int _epoll_fd;
    struct epoll_event *_epoll_event;
    unsigned int _epoll_event_size;
};

该类管理着,epoll模型文件描述符,_epoll_event第一个就绪结点地址、最大可以接收的 _epoll_event_size.

注意这里的_epoll_event,并不是实际在epoll模型中的自由结点,而是该自由结点将重要信息拷贝到我们传入的这个空间中。

传入的event_size是告诉epoll模型我最多只能拷贝这么多个结点信息,还有就下次再说了,返回值是本次拷贝数量n。

Sock类

替我们来链接新链接的类

class Sock
{
public:
    Sock(int gblock = 5)
        : _listen_socket(socket(AF_INET, SOCK_STREAM, 0)), _gblock(gblock)
    {
        int opt = 1;
        setsockopt(_listen_socket, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof opt);
    }
    int get_listen_sock()
    {
        return _listen_socket;
    }
    void Sock_bind(const std::string &ip = "0.0.0.0", uint16_t port = 8080)
    {
        sockaddr_in self;
        bzero(&self, sizeof(self));
        self.sin_family = AF_INET;
        self.sin_addr.s_addr = inet_addr(ip.c_str());
        self.sin_port = htons(port);
        if (0 > bind(_listen_socket, (sockaddr *)&self, sizeof(self)))
        {
            log(Fatal, "bind 致命错误[%d]", __TIME__);
            exit(1);
        }
    }
    void Sock_connect(const char *ip, const char *port)
    {
        struct sigaction s;
        sockaddr_in server;
        bzero(&server, sizeof(server));
        server.sin_family = AF_INET;
        inet_aton(ip, &server.sin_addr);
        server.sin_port = htons(atoi(port));
        connect(_listen_socket, (sockaddr *)&server, sizeof(server));
    }
    void Sock_listen()
    {
        if (listen(_listen_socket, _gblock) > 0)
        {
            log(Fatal, "listen 致命错误[%d]", __TIME__);
            exit(2);
        }
    }
    int Sock_accept(std::string *ip, uint16_t *port)
    {
        sockaddr_in src;
        bzero(&src, sizeof(src));
        socklen_t srclen = sizeof(src);
        int worksocket = accept(_listen_socket, (sockaddr *)&src, &srclen);
        if (worksocket < 0)
        {
            log(Fatal, "link erron 链接失败");
            return -1;
        }
        *ip = inet_ntoa(src.sin_addr);
        *port = ntohs(src.sin_port);
        return worksocket;
    }
    ~Sock()
    {
        if (_listen_socket >= 0)
            close(_listen_socket);
    }
private:
    int _listen_socket;
    const int _gblock;
};

围绕着_listen_socket来操作的类

Log类

就是个日志没啥

class Log
{
public:
    Log()
    {
        std::cout<<"create log...\n"<<std::endl;
        printMethod = Screen;
        path = "./log/";
    }
    void Enable(int method)
    {
        printMethod = method;
    }
    std::string levelToString(int level)
    {
        switch (level)
        {
        case Info:
            return "Info";
        case Debug:
            return "Debug";
        case Warning:
            return "Warning";
        case Error:
            return "Error";
        case Fatal:
            return "Fatal";
        default:
            return "None";
        }
    }
    void printLog(int level, const std::string &logtxt)
    {
        switch (printMethod)
        {
        case Screen:
            std::cout << logtxt << std::endl;
            break;
        case Onefile:
            printOneFile(LogFile, logtxt);
            break;
        case Classfile:
            printClassFile(level, logtxt);
            break;
        default:
            break;
        }
    }
    void printOneFile(const std::string &logname, const std::string &logtxt)
    {
        std::string _logname = path + logname;
        std::cout<<_logname<<std::endl;
        int fd = open(_logname.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0666); // "log.txt"
        if (fd < 0)
        {
            perror("fail:");
            return;
        }
        write(fd, logtxt.c_str(), logtxt.size());
        close(fd);
    }
    void printClassFile(int level, const std::string &logtxt)
    {
        std::string filename = LogFile;
        filename += ".";
        filename += levelToString(level); // "log.txt.Debug/Warning/Fatal"
        printOneFile(filename, logtxt);
    }
    ~Log()
    {
    }
    void operator()(int level, const char *format, ...)
    {
        time_t t = time(nullptr);
        struct tm *ctime = localtime(&t);
        char leftbuffer[SIZE];
        snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%d-%d-%d %d:%d:%d]", levelToString(level).c_str(),
                 ctime->tm_year + 1900, ctime->tm_mon + 1, ctime->tm_mday,
                 ctime->tm_hour, ctime->tm_min, ctime->tm_sec);
        va_list s;
        va_start(s, format);
        char rightbuffer[SIZE];
        vsnprintf(rightbuffer, sizeof(rightbuffer), format, s);
        va_end(s);
        // 格式:默认部分+自定义部分
        char logtxt[SIZE * 2];
        snprintf(logtxt, sizeof(logtxt), "%s %s\n", leftbuffer, rightbuffer);
        // printf("%s", logtxt); // 暂时打印
        printLog(level, logtxt);
    }
private:
    int printMethod;
    std::string path;
};

Connection类

using func_t = std::function<void(Connection *)>;
class Connection
{
public:
    Connection(int sock, void *tsvr = nullptr) : _fd(sock), _tsvr(tsvr)
    {
        time_t _lasttime = (time_t)time(0);
    }
    bool SetCallBack(func_t recv_cb, func_t send_cb, func_t except_cb)
    {
        _recv_cb = recv_cb;
        _send_cb = send_cb;
        _except_cb = except_cb;
    }
    int _fd;
    int _events;
    // 三个回调方法,表征的就是对_sock进行特定读写对应的方法
    func_t _recv_cb;
    func_t _send_cb;
    func_t _except_cb;
    // 接收缓冲区&&发送缓冲区
    std::string _inbuffer; // 暂时没有办法处理二进制流,文本是可以的
    std::string _outbuffer;
    int _lasttime = 0;
    std::string _client_ip;
    uint16_t _client_port;
    // 设置对epTcpServer的回值指针
    void *_tsvr;
};

管理任何链接描述符(包括listen)的链接类,保存某个链接监视的读写异常事件,并且保存这些事件发生后对应的调用方法,并且每个事件设置读写应用层缓冲区,并且采用回值指针(在写入数据后采用该指针通知上层下次该链接修改采用监视事件条件。

服务器代码

#pragma once
#include "Log.hpp"
#include "sock.hpp"
#include "Epoll.hpp"
#include "Protocol.hpp"
#include <unordered_map>
#include <cassert>
#include <vector>
static const std::uint16_t server_port_defaut = 8080;
static const std::string server_ip_defaut = "0.0.0.0";
static const int READONE = 1024;
#define CLIENTDATA conn->_client_ip.c_str(),conn->_client_port
using callback_t = std::function<void(Connection *, std::string &)>;
class epTcpServer
{
    static const std::uint16_t default_port = 8080;
    static const std::uint16_t default_revs_num = 128;
public:
    epTcpServer(int port = default_port, int revs_num = default_revs_num)
        : _port(port), _epoll(default_revs_num), _revs_num(revs_num)
    {
        _sock.Sock_bind();
        _sock.Sock_listen();
        _listen = _sock.get_listen_sock();
        AddConnection(_listen, std::bind(&epTcpServer::Accept, this, std::placeholders::_1), nullptr, nullptr);
        _revs = _epoll.bind_ready_ptr();
        cout << "debug 1" << endl;
    }
    void Dispather(callback_t cb)
    {
        _cb = cb;
        while (true)
        {
            LoopOnce();
        }
    }
    void EnableReadWrite(Connection *conn, bool readable, bool writeable)
    {
        uint32_t events = ((readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0));
        bool res = _epoll.EpollCtl(EPOLL_CTL_MOD, conn->_fd, events);
        assert(res); // 更改成if
    }
private:
    void LoopOnce()
    {
        int n = _epoll.EpollWait(-1);
        log(Info,"The number of links in this response :%d",n);
        for (int i = 0; i < n; i++)
        {
            int sock = _revs[i].data.fd;
            uint32_t revents = _revs[i].events;
            log(Info, "Accessible fd:%d", sock);
            bool status = IsConnectionExists(sock);
            if (!status)
            {
                log(Error, "There is no such data in the hash sock:%d", sock);
                continue;
            }
            if (revents & EPOLLIN)
            {
                if (_Connection_hash[sock]->_recv_cb != nullptr)
                {
                    _Connection_hash[sock]->_recv_cb(_Connection_hash[sock]);
                }
            }
            status = IsConnectionExists(sock);
            if (revents & EPOLLOUT)
            {
                if (!status)
                {
                    log(Warning, "in read closs sock:%d", sock);
                    continue;
                }
                if (_Connection_hash[sock]->_send_cb != nullptr)
                    _Connection_hash[sock]->_send_cb(_Connection_hash[sock]);
            }
        }
    }
    bool IsConnectionExists(int sock)
    {
        auto iter = _Connection_hash.find(sock);
        if (iter == _Connection_hash.end())
            return false;
        else
            return true;
    }
    void Accept(Connection *conn)
    {
        while (1)
        {
            std::string ip;
            uint16_t port;
            int work = _sock.Sock_accept(&ip, &port);
            if (work < 0)
            {
                if (errno == EAGAIN || errno == EWOULDBLOCK)
                    break;
                else if (errno == EINTR) // 信号中断
                    continue;            // 概率非常低
                else
                {
                    // accept失败
                    log(Warning, "accept error, %d : %s", errno, strerror(errno));
                    break;
                }
            }
            Connection *ret = AddConnection(work, std::bind(&epTcpServer::Read, this, std::placeholders::_1),
                                            std::bind(&epTcpServer::Write, this, std::placeholders::_1),
                                            std::bind(&epTcpServer::Except, this, std::placeholders::_1));
            ret->_client_ip = ip;
            ret->_client_port = port;
            log(Info, "accept success && TcpServer success clinet[%s|%d]", ret->_client_ip.c_str(), ret->_client_port);
        }
    }
    void Read(Connection *conn)
    {
        int cnt = 0;
        while (1)
        {
            char buffer[READONE] = {0};
            int n = recv(conn->_fd, buffer, sizeof(buffer) - 1, 0);
            if (n < 0)
            {
                if (errno == EAGAIN || errno == EWOULDBLOCK)
                    break; // 正常的
                else if (errno == EINTR)
                    continue;
                else
                {
                    log(Error, "recv error, %d : %s", errno, strerror(errno));
                    conn->_except_cb(conn);
                    return;
                }
            }
            else if (n == 0)
            {
                log(Debug, "client[%s|%d] quit, server close [%d]", CLIENTDATA, conn->_fd);
                conn->_except_cb(conn);
                return;
            }
            else
            {
                buffer[n] = 0;
                conn->_inbuffer += buffer;
            }
        }
        log(Info,"The data obtained from the client[%s|%d] is:%s",CLIENTDATA,conn->_inbuffer.c_str());
        std::vector<std::string> messages;
        SpliteMessage(conn->_inbuffer, &messages);
        for (auto &msg : messages)
            _cb(conn, msg);
    }
    void Write(Connection *conn)
    {
        printf("write back to client[%s|%d]:%s", conn->_client_ip.c_str(), conn->_client_port, conn->_outbuffer.c_str());
        while (true)
        {
            ssize_t n = send(conn->_fd, conn->_outbuffer.c_str(), conn->_outbuffer.size(), 0);
            if (n > 0)
            {
                conn->_outbuffer.erase(0, n);
                if (conn->_outbuffer.empty())
                    break;
            }
            else
            {
                if (errno == EAGAIN || errno == EWOULDBLOCK)
                    break;
                else if (errno == EINTR)
                    continue;
                else
                {
                    log(Error, "send error, %d : %s", errno, strerror(errno));
                    conn->_except_cb(conn);
                    break;
                }
            }
        }
        if (conn->_outbuffer.empty())
            EnableReadWrite(conn, true, false);
        else
            EnableReadWrite(conn, true, true);
    }
    void Except(Connection *conn)
    {
        if (!IsConnectionExists(conn->_fd))
            return;
        // 1. 从epoll中移除
        bool res = _epoll.EpollCtl(EPOLL_CTL_DEL, conn->_fd, 0);
        assert(res); // 要判断
        // 2. 从我们的unorder_map中移除
        _Connection_hash.erase(conn->_fd);
        // 3. close(sock);
        close(conn->_fd);
        // 4. delete conn;
        delete conn;
        log(Debug, "Excepter 回收完毕,所有的异常情况");
    }
    Connection *AddConnection(int sock, func_t recv_cb, func_t send_cb, func_t except_cb, int sendevent = 0)
    {
        SetNonBlock(sock);
        Connection *conn = new Connection(sock, this);
        conn->SetCallBack(recv_cb, send_cb, except_cb);
        _epoll.EpollCtl(EPOLL_CTL_ADD, sock, EPOLLIN | EPOLLET | sendevent);
        _Connection_hash[sock] = conn;
        return conn;
    }
    bool SetNonBlock(int sock)
    {
        int fl = fcntl(sock, F_GETFL);
        if (fl < 0)
            return false;
        fcntl(sock, F_SETFL, fl | O_NONBLOCK);
        return true;
    }
private:
    int _listen;
    int _port;
    int _revs_num;
    zjy::Sock _sock;
    zjy::Epoll _epoll;
    std::unordered_map<int, Connection *> _Connection_hash;
    callback_t _cb;
    struct epoll_event *_revs;
};

到此这篇关于基于epoll实现 Reactor服务器的文章就介绍到这了,更多相关epoll Reactor服务器内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文