Redis

关注公众号 jb51net

关闭
首页 > 数据库 > Redis > Redis定时任务

Redis定时任务原理的实现

作者:心城以北

本文主要是基于 redis 6.2 源码进行分析定时事件的数据结构和常见操作,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

本文主要是基于 redis 6.2 源码进行分析定时事件的数据结构和常见操作。

数据结构

在 redis 中通过 aeTimeEvent 结构来创建定时任务事件,代码如下:

/* Time event structure */
typedef struct aeTimeEvent {
    // 标识符
    long long id; /* time event identifier. */
    // 定时纳秒数
    monotime when;
    // 定时回调函数
    aeTimeProc *timeProc;
    // 注销定时器时候的回调函数
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeTimeEvent *prev;
    struct aeTimeEvent *next;
    int refcount; /* refcount to prevent timer events from being
             * freed in recursive time event calls. */
} aeTimeEvent;

常见操作

1. 创建定时事件

redis 中最重要的定时函数且是周期执行的函数,使用的是 serverCron 函数。在 redis 中由于定时任务比较少,因此并没有严格的按照过期时间来排序的,而是按照 id自增 + 头插法 来保证基本有序。

if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
  serverPanic("Can't create event loop timers.");
  exit(1);
}

//创建定时器对象
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc)
{
    long long id = eventLoop->timeEventNextId++;
    aeTimeEvent *te;

    te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;
    te->id = id;
    te->when = getMonotonicUs() + milliseconds * 1000;
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    te->clientData = clientData;
    te->prev = NULL;
    // 头插法 
    te->next = eventLoop->timeEventHead;
    te->refcount = 0;
    if (te->next)
        te->next->prev = te;
    eventLoop->timeEventHead = te;
    return id;
}

2. 触发定时事件

redis 中是采用 IO 复用来进行定时任务的。

查找距离现在最近的定时事件,见 usUntilEarliestTimer

​
/* How many microseconds until the first timer should fire.
 * If there are no timers, -1 is returned.
 *
 * Note that's O(N) since time events are unsorted.
 * Possible optimizations (not needed by Redis so far, but...):
 * 1) Insert the event in order, so that the nearest is just the head.
 *    Much better but still insertion or deletion of timers is O(N).
 * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)).
 */
static int64_t usUntilEarliestTimer(aeEventLoop *eventLoop) {
    aeTimeEvent *te = eventLoop->timeEventHead;
    if (te == NULL) return -1;

    aeTimeEvent *earliest = NULL;
    while (te) {
        if (!earliest || te->when < earliest->when)
            earliest = te;
        te = te->next;
    }

    monotime now = getMonotonicUs();
    return (now >= earliest->when) ? 0 : earliest->when - now;
}

​

这里时间复杂度可能比较高,实际中需要结合具体场景使用。

更新剩余过期时间,想想为啥呢?因为我们前面提到过,IO 复用有可能因为 IO 事件返回,所以需要更新。

if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
  usUntilTimer = usUntilEarliestTimer(eventLoop);

if (usUntilTimer >= 0) {
  tv.tv_sec = usUntilTimer / 1000000;
  tv.tv_usec = usUntilTimer % 1000000;
  tvp = &tv;
} else {
  if (flags & AE_DONT_WAIT) {
    // 不等待
    tv.tv_sec = tv.tv_usec = 0;
    tvp = &tv;
  } else {
    /* Otherwise we can block */
    tvp = NULL; /* wait forever */
  }
}

3. 执行定时事件

一次性的执行完直接删除,周期性的执行完在重新添加到链表。

/* Process time events */
static int processTimeEvents(aeEventLoop *eventLoop) {
  int processed = 0;
  aeTimeEvent *te;
  long long maxId;

  te = eventLoop->timeEventHead;
  maxId = eventLoop->timeEventNextId-1;
  monotime now = getMonotonicUs();
  
  // 删除定时器
  while(te) {
    long long id;
        
    // 下一轮中对事件进行删除
    /* Remove events scheduled for deletion. */
    if (te->id == AE_DELETED_EVENT_ID) {
      aeTimeEvent *next = te->next;
      /* If a reference exists for this timer event,
             * don't free it. This is currently incremented
             * for recursive timerProc calls */
      if (te->refcount) {
        te = next;
        continue;
      }
      if (te->prev)
        te->prev->next = te->next;
      else
        eventLoop->timeEventHead = te->next;
      if (te->next)
        te->next->prev = te->prev;
      if (te->finalizerProc) {
        te->finalizerProc(eventLoop, te->clientData);
        now = getMonotonicUs();
      }
      zfree(te);
      te = next;
      continue;
    }
    
    if (te->id > maxId) {
      te = te->next;
      continue;
    }

    if (te->when <= now) {
      int retval;

      id = te->id;
      te->refcount++;
      // timeProc 函数返回值 retVal 为时间事件执行的间隔
      retval = te->timeProc(eventLoop, id, te->clientData);
      te->refcount--;
      processed++;
      now = getMonotonicUs();
      if (retval != AE_NOMORE) {
        te->when = now + retval * 1000;
      } else {
        // 如果超时了,那么标记为删除
        te->id = AE_DELETED_EVENT_ID;
      }
    }
    // 执行下一个
    te = te->next;
  }
  return processed;
}

总结

优点:实现简单
缺点:如果定时任务很多,效率比较低。

到此这篇关于Redis定时任务原理的实现的文章就介绍到这了,更多相关Redis定时任务内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文