C 语言

关注公众号 jb51net

关闭
首页 > 软件编程 > C 语言 > C++协程调度模块

C++高性能服务器框架之协程调度模块

作者:找人找不到北

这篇文章主要介绍了C++高性能服务器框架中的协程调度模块,文中通过代码示例介绍的非常详细,对我们的学习或工作有一定的帮助,需要的朋友可以参考下

协程调度模块概述

封装了一个N : M协程调度器,创建M个协程在N个线程上运行。通过schedule()方法将cb或fiber重新加到任务队列中执行任务,协程可以在线程上自由切换,也可以在指定线程上执行。

         1 —— N     1 —— M
scheduler ---> thread ---> fiber

N : M 协程可以在线程间自由切换
1. 线程池, 分配一组线程
2. 协程调度器,将协程指定到相应线程上执行
    a)随机选择空闲的协程执行
    b)协程指定必须在某个线程上执行

协程调度器调度主要思想为:先查看任务队列中有没有任务需要执行,若没有任务需要执行则执行idel(),其思想主要在run()中体现。

其次,在设计协程调度器时,设置了一个use_caller来决定是否将当前调度线程也纳入调度中,这样可以少创建一个线程执行任务,效率更高。

详解

class Scheduler

两个局部线程变量保存当前线程的协程调度器和主协程

// 当前协程调度器
static thread_local Scheduler* t_secheduler = nullptr;
// 线程主协程
static thread_local Fiber* t_fiber = nullptr;

FiberAndThread(任务结构体)

通过FiberAndThread结构体在存储协程,回调函数,线程的信息

struct FiberAndThread {
        // 协程
        Fiber::ptr fiber;
        // 协程执行函数
        std::function<void()> cb;
        // 线程id 协程在哪个线程上 
        int thread;
        // 确定协程在哪个线程上跑
        FiberAndThread(Fiber::ptr f, int thr)
            :fiber(f), thread(thr) {
        }
        // 通过swap将传入的 fiber 置空,使其引用计数-1
        FiberAndThread(Fiber::ptr* f, int thr)
            :thread(thr) {
            fiber.swap(*f);
        }
        // 确定回调在哪个线程上跑
        FiberAndThread(std::function<void()> f, int thr)
            :cb(f), thread(thr) {
        }
        // 通过swap将传入的 cb 置空,使其引用计数-1
        FiberAndThread(std::function<void()>* f, int thr)
            :thread(thr) {
            cb.swap(*f);
        }
​
        // 默认构造
        FiberAndThread() {
            thread = -1;
        }
        // 重置
        void reset() {
            fiber = nullptr;
            cb = nullptr;
            thread = -1;
        }
​
    };

mumber(成员变量)

private:
    // Mutex
    MutexType m_mutex;
    // 线程池
    std::vector<sylar::Thread::ptr> m_threads;
    // 待执行的协程队列
    std::list<FiberAndThread> m_fibers;
    // use_caller为true时有效,调度协程
    Fiber::ptr m_rootFiber;
    // 协程调度器名称
    std::string m_name;
​
protected:
    // 协程下的线程id数组
    std::vector<int> m_threadIds;
    // 线程数量
    size_t m_threadCount = 0;
    // 工作线程数量
    std::atomic<size_t> m_activateThreadCount = {0};
    // 空闲线程数量
    std::atomic<size_t> m_idleThreadCount = {0};
    // 是否正在停止
    bool m_stopping = true;
    // 是否自动停止
    bool m_autoStop = false;
    // 主线程Id(use_caller)
    int m_rootThread = 0;

scheduler(调度协程)

// 调度协程
template<class FiberOrCb>
void schedule(FiberOrCb fc, int thread = -1) {
    bool need_tickle = false;
    {
        MutexType::Lock lock(m_mutex);
        // 将任务加入到队列中,若任务队列中已经有任务了,则tickle()
        need_tickle = scheduleNoLock(fc, thread);
    }
​
    if (need_tickle) {
        tickle();
    }
}
// 批量调度协程
template<class InputIterator>
void schedule(InputIterator begin, InputIterator end) {
    bool need_tickle = false;
    {
        MutexType::Lock lock(m_mutex);
        while (begin != end) {
            need_tickle = scheduleNoLock(&*begin, -1) || need_tickle;
            ++begin;
        }
    }
    if (need_tickle) {
        tickle();
    }
}

检查任务队列中有无任务,将任务加入到任务队列中,若任务队列中本来就已经有任务了,就tickle以下

/**
 * @brief 协程调度启动(无锁)
*/
template<class FiberOrCb>
bool scheduleNoLock(FiberOrCb fc, int thread) {
    bool need_tickle = m_fibers.empty();
    FiberAndThread ft(fc, thread);
    if (ft.fiber || ft.cb) {
        m_fibers.push_back(ft);
    }
    return need_tickle;
}

Scheduler(构造函数)

Scheduler::Scheduler(size_t threads, bool use_caller, const std::string &name)
    :m_name(name) {
    // 确定线程数量要正确
    SYLAR_ASSERT(threads > 0);
    // 是否将协程调度线程也纳入调度器
    if (use_caller) {
        // 设置线程名称
        sylar::Thread::SetName(m_name);
        // 获得主协程
        sylar::Fiber::GetThis();
        // 线程数量-1
        --threads;
        SYLAR_ASSERT(GetThis() == nullptr);
        // 设置当前协程调度器
        t_secheduler = this;
​
        // 将此fiber设置为 use_caller,协程则会与 Fiber::CallerMainFunc() 绑定
        // 非静态成员函数需要传递this指针作为第一个参数,用 std::bind()进行绑定
        m_rootFiber.reset(new Fiber(std::bind(&Scheduler::run, this), 0, true));
        // 设置当前线程的主协程为m_rootFiber
        // 这里的m_rootFiber是该线程的主协程(执行run任务的协程),只有默认构造出来的fiber才是主协程
        t_fiber = m_rootFiber.get();
        // 获得当前线程id
        m_rootThread = sylar::GetThreadId();
        m_threadIds.push_back(m_rootThread);
    }
    // 不将当前线程纳入调度器
    else {
        m_rootThread = -1;
    }
    m_threadCount = threads;
}

~Scheduler(析构函数)

Scheduler::~Scheduler() {
    // 必须达到停止条件
    SYLAR_ASSERT(m_stopping);
    if (GetThis() == this) {
        t_secheduler = nullptr;
    }
}

start(启动调度器)

void Scheduler::start() {
    SYLAR_LOG_INFO(g_logger) << "start()";
    MutexType::Lock lock(m_mutex);
    // 已经启动了
    if (!m_stopping) {
        return;
    }
    // 将停止状态设置为false
    m_stopping = false;
    // 线程池为空
    SYLAR_ASSERT(m_threads.empty());
   	// 创建线程池
    m_threads.resize(m_threadCount);
    for (size_t i = 0; i < m_threadCount; ++i) {
        // 线程执行 run() 任务
        m_threads[i].reset(new Thread(std::bind(&Scheduler::run, this)
                        , m_name + "_" + std::to_string(i)));
        m_threadIds.push_back(m_threads[i]->getId());
    }
    lock.unlock();
    /* 在这里切换线程时,swap的话会将线程的主协程与当前协程交换,当使用use_caller时,t_fiber = m_rootFiber,call是将当前协程与主协程交换
     * 为了确保在启动之后仍有任务加入任务队列中,所以在stop()中做该线程的启动,这样就不会漏掉任务队列中的任务
     */
    /*
        if(m_rootFiber) {
             // t_fiber = m_rootFiber.get(), 从自己切换到自己了属于是
             // m_rootFiber->swapIn();
             m_rootFiber->call();
         }
     */
    SYLAR_LOG_INFO(g_logger) << "start() end";
}

stop(停止调度器)

void Scheduler::stop() {
    SYLAR_LOG_INFO(g_logger) << "stop()";
    // 进入stop将自动停止设为true
    m_autoStop = true;
    // 使用use_caller,并且只有一个线程,并且主协程的状态为结束或者初始化
    if (m_rootFiber 
                && m_threadCount == 0
                && (m_rootFiber->getState() == Fiber::TERM
                    || m_rootFiber->getState() == Fiber::INIT)) {
        SYLAR_LOG_INFO(g_logger) << this->m_name << " sheduler stopped";
        // 停止状态为true
        m_stopping = true;
        // 若达到停止条件则直接return
        if (stopping()) {
            return;
        }
    }
    // use_caller线程
    // 当前调度器和t_secheduler相同
    if (m_rootThread != -1) {
        SYLAR_ASSERT(GetThis() == this);
    }
    // 非use_caller,此时的t_secheduler应该为nullptr
    else {
        SYLAR_ASSERT(GetThis() != this);
    }
    // 停止状态为true
    m_stopping = true;
    // 每个线程都tickle一下
    for (size_t i = 0; i < m_threadCount; ++i) {
        tickle();
    }
    // 使用use_caller多tickle一下
    if (m_rootFiber) {
        tickle();
    }
    // 使用use_caller,只要没达到停止条件,调度器主协程交出执行权,执行run
    if (m_rootFiber) {
        if (!stopping()) {
            m_rootFiber->call();
        }
    }
    std::vector<Thread::ptr> thrs;
    {
        MutexType::Lock lock(m_mutex);
        thrs.swap(m_threads);
    }
    // 等待线程执行完成
    for (auto& i : thrs) {
        i->join();
    }
}

run(协程调度函数)

void Scheduler::run() {
    SYLAR_LOG_INFO(g_logger) << "run()";
    // hook
    set_hook_enable(true);
    // 设置当前调度器
    setThis();
    // 非user_caller线程,设置主协程为线程主协程
    if (sylar::GetThreadId() != m_rootThread) {
        t_fiber = Fiber::GetThis().get();
    }
    SYLAR_LOG_DEBUG(g_logger) << "new idle_fiber";
    // 定义dile_fiber,当任务队列中的任务执行完之后,执行idle()
    Fiber::ptr idle_fiber(new Fiber(std::bind(&Scheduler::idle, this)));
    Fiber::ptr cb_fiber;
    FiberAndThread ft;
    while (true) {
        ft.reset();
        bool tickle_me = false;
        bool is_active = false;
        {	// 从任务队列中拿fiber和cb
            MutexType::Lock lock(m_mutex);
            auto it = m_fibers.begin();
            while (it != m_fibers.end()) {
                // 如果当前任务指定的线程不是当前线程,则跳过,并且tickle一下
                if (it->thread != -1 && it->thread != sylar::GetThreadId()) {
                    ++it;
                    tickle_me = true;
                    continue;
                }
                // 确保fiber或cb存在
                SYLAR_ASSERT(it->fiber || it->cb);
                // 如果该fiber正在执行则跳过
                if (it->fiber && it->fiber->getState() == Fiber::EXEC) {
                    ++it;
                    continue;
                }
                // 取出该任务
                ft = *it;
                // 从任务队列中清除
                m_fibers.erase(it);
                // 正在执行任务的线程数量+1
                ++m_activateThreadCount;
                // 正在执行任务
                is_active = true;
                break;
            }
        }
        // 取到任务tickle一下
        if (tickle_me) {
            tickle();
        }
        // 如果任务是fiber,并且任务处于可执行状态
        if (ft.fiber && (ft.fiber->getState() != Fiber::TERM
                            || ft.fiber->getState() != Fiber::EXCEPT)) {
            // 执行任务
            ft.fiber->swapIn();
            // 执行完成,活跃的线程数量减-1
            --m_activateThreadCount;
            // 如果线程的状态被置为了READY
            if (ft.fiber->getState() == Fiber::READY) {
                // 将fiber重新加入到任务队列中
                schedule(ft.fiber);
                // INIT或HOLD状态
            } else if (ft.fiber->getState() != Fiber::TERM
                            && ft.fiber->getState() != Fiber::EXCEPT) {
				// 设置fiber状态为HOLD
                ft.fiber->setState(Fiber::HOLD);
            }
            // 执行完毕重置数据ft
            ft.reset();
         // 如果任务是cb
        } else if(ft.cb) {
            // cb_fiber存在,重置该fiber
            if (cb_fiber) {
                cb_fiber->reset(ft.cb);
            // cb_fiber不存在,new新的fiber
            } else {
                SYLAR_LOG_DEBUG(g_logger) << "new ft.cb";
                cb_fiber.reset(new Fiber(ft.cb));
            }
            // 重置数据ft
            ft.reset();
			// 执行cb任务
            cb_fiber->swapIn();
            // 执行完,执行任务线程数量-1
            --m_activateThreadCount;
            // 若cb_fiber状态为READY
            if (cb_fiber->getState() == Fiber::READY) {
                // 重新放入任务队列中
                schedule(cb_fiber);
                // 释放智能指针
                cb_fiber.reset();
              // cb_fiber异常或结束,就重置状态,可以再次使用该cb_fiber
            } else if (cb_fiber->getState() == Fiber::EXCEPT
                        || cb_fiber->getState() == Fiber::TERM) {
                // cb_fiber的执行任务置空
                cb_fiber->reset(nullptr);
            } else {
                // 设置状态为HOLD,此任务后面还会通过ft.fiber被拉起
                cb_fiber->setState(Fiber::HOLD);
                // 释放该智能指针,调用下一个任务时要重新new一个新的cb_fiber
                cb_fiber.reset();
            }
          // 没有任务执行
        } else {
            // 我感觉这里判断么啥用
            if (is_active) {
                --m_activateThreadCount;
                continue;
            }
            // 如果idle_fiber的状态为TERM则结束循环,真正的结束
            if (idle_fiber->getState() == Fiber::TERM) {
                SYLAR_LOG_INFO(g_logger) << "idle_fiber term"; 
                break;
            }
            // 正在执行idle的线程数量+1
            ++m_idleThreadCount;
            // 执行idle()
            idle_fiber->swapIn();
            // 正在执行idle的线程数量-1
            --m_idleThreadCount;
            // idle_fiber状态置为HOLD
            if (idle_fiber->getState() != Fiber::TERM
                    && idle_fiber->getState() != Fiber::EXCEPT) {
                idle_fiber->setState(Fiber::HOLD);
            }
        }
    }
}

stopping(判断停止条件)

bool Scheduler::stopping() {
    MutexType::Lock lock(m_mutex);
    // 当自动停止 && 正在停止 && 任务队列为空 && 活跃的线程数量为0
    return m_autoStop && m_stopping 
        && m_fibers.empty() && m_activateThreadCount == 0;
}

总结

举个具体的例子。

#include "../sylar/sylar.h"
static sylar::Logger::ptr g_logger = SYLAR_LOG_ROOT();
void test_fiber() {
    static int count = 5;
    SYLAR_LOG_INFO(g_logger) << "---test in fiber---" << count;
    sylar::set_hook_enable(false);
    sleep(1);
    // 循环将test_fiber加入到任务队列中,并且指定第一个拿到该任务的线程一直执行
    if (--count > 0) {
        sylar::Scheduler::GetThis()->schedule(&test_fiber, sylar::GetThreadId());
    }
}
int main(int argc, char** argv) {
    g_logger->setLevel(sylar::LogLevel::INFO);
    sylar::Thread::SetName("main");
    SYLAR_LOG_INFO(g_logger) << "main start";
    sylar::Scheduler sc(2, false, "work");
    sc.start();
    SYLAR_LOG_INFO(g_logger) << "schedule";
    sc.schedule(&test_fiber);
    sc.stop();
    SYLAR_LOG_INFO(g_logger) << "main end";
    return 0;
}
// 设置2个线程, 并且将use_caller设为false, 设置名称为"work", 指定线程
// 这里可以看到有3个线程 1684 1685 1686
// 1684为调度线程, 1685和1686为执行任务的线程
// 可以看到任务都是在1686线程上执行的,因为在shceduler()时指定了任务在第一个拿到该任务的线程上一直执行
1684    main    0       [INFO]  [root]  tests/test_scheduler.cc:20      main start
1684    main    0       [INFO]  [root]  tests/test_scheduler.cc:23      schedule
1686    work_1  4       [INFO]  [root]  tests/test_scheduler.cc:7       ---test in fiber---5
1686    work_1  4       [INFO]  [root]  tests/test_scheduler.cc:7       ---test in fiber---4
1686    work_1  4       [INFO]  [root]  tests/test_scheduler.cc:7       ---test in fiber---3
1686    work_1  4       [INFO]  [root]  tests/test_scheduler.cc:7       ---test in fiber---2
1686    work_1  4       [INFO]  [root]  tests/test_scheduler.cc:7       ---test in fiber---1
1685    work_0  0       [INFO]  [system]        sylar/scheduler.cc:237  idle_fiber term
1686    work_1  0       [INFO]  [system]        sylar/scheduler.cc:237  idle_fiber term
1684    main    0       [INFO]  [root]  tests/test_scheduler.cc:27      main end
// 设置2个线程, 并且将use_caller设为true, 设置名称为"work",不指定线程
// 这里可以看到有2个线程 2841 2842
// 2841为调度线程,他也将自己纳入调度器中执行任务
2841    work    0       [INFO]  [root]  tests/test_scheduler.cc:20      main start
2841    work    0       [INFO]  [root]  tests/test_scheduler.cc:23      schedule
2842    work_0  4       [INFO]  [root]  tests/test_scheduler.cc:7       ---test in fiber---5
2841    work    6       [INFO]  [root]  tests/test_scheduler.cc:7       ---test in fiber---4
2842    work_0  4       [INFO]  [root]  tests/test_scheduler.cc:7       ---test in fiber---3
2841    work    6       [INFO]  [root]  tests/test_scheduler.cc:7       ---test in fiber---2
2842    work_0  4       [INFO]  [root]  tests/test_scheduler.cc:7       ---test in fiber---1
2842    work_0  0       [INFO]  [system]        sylar/scheduler.cc:237  idle_fiber term
2841    work    1       [INFO]  [system]        sylar/scheduler.cc:237  idle_fiber term
2841    work    0       [INFO]  [root]  tests/test_scheduler.cc:27      main end

当我们设置use_caller时,将调度线程也纳入管理中执行任务,在构造函数中通过Fiber::GetThis()获得主协程,然后new一个子协程作为该线程的主协程并与run()绑定,在stop()时使用call()将执行权交给线程主协程执行run()

start()时创建其他线程并与run()绑定,此时线程就开始执行run()

run()中,使用Fiber::GetThis()获得主协程并设置为线程主协程。在协程切换时,都是执行任务协程与线程主协程之间的切换。当达到停止条件,idle()执行完毕时,run()也执行完毕。

此时use_caller线程使用back()将线程主协程切换到主协程继续执行stop()等待其他线程执行完毕。

以上就是C++高性能服务器框架之协程调度模块的详细内容,更多关于C++协程调度模块的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文