定时器方案:红黑树、时间轮、最小堆

C/C++/Rust1667 字

定时器方案:红黑树、时间轮、最小堆

定时器的应用:

  • ⼼跳检测
  • 技能冷却
  • 武器冷却
  • 倒计时

定时器概述

对于服务端来说,驱动服务端逻辑的事件主要有两个,⼀个是⽹络事件,另⼀个是时间事件;

在不同框架中,这两种事件有不同的实现⽅式;

  • 第⼀种,⽹络事件和时间事件在⼀个线程当中配合使⽤;例如nginx、redis;
  • 第⼆种,⽹络事件和时间事件在不同线程当中处理;例如skynet;
// 第⼀种 ⽹络事件和时间事件在⼀个线程当中配合使⽤;
while (!quit) {
        int now = get_now_time();// 单位:ms
         int timeout = get_nearest_timer() - now;
         if (timeout < 0) {
      timeout = 0;
    }
         int nevent = epoll_wait(epfd, ev, nev, timeout);
         for (int i=0; i<nevent; i++) {
                 //... ⽹络事件处理
         }
         update_timer(); // 时间事件处理
}

// 第⼆种 ⽹络事件和时间事件在不同线程当中处理;
void* thread_timer(void * thread_param) {
        init_timer();
         while (!quit) {
                 update_timer(); // 更新检测定时器,并把定时事件发送到消息队列中
                 sleep(t); // 这⾥的 t 要⼩于 时间精度
         }
         clear_timer();
 return NULL; 
}

pthread_create(&pid, NULL, thread_timer, &thread_param);

定时器设计

接⼝设计

// 初始化定时器
void init_timer();
// 添加定时器
Node* add_timer(int expire, callback cb);
// 删除定时器
bool del_timer(Node* node);
// 找到最近要发⽣的定时任务
Node* find_nearest_timer();
// 更新检测定时器
void update_timer();
// 清除定时器
void clear_timer();

数据结构选择

  • 红⿊树:对于增删查,时间复杂度为$O(log_2n)$ ;对于红⿊树最⼩节点为最左侧节点,时间复杂度为 $O(log_2n) $;
  • 最⼩堆:对于增查,时间复杂度为 $O(log_2n)$ ;对于删时间复杂度为 $O(n)$,但是可以通过辅助数据结构(map或者hashtable来快速索引节点)来加快删除操作;对于最⼩节点为根节点,时间复杂度为 $O(1)$;
  • 跳表:对于增删查,时间复杂度为$O(log_2n)$ ;对于跳表最⼩节点为最左侧节点,时间复杂度为$O(1)$;但是空间复杂度⽐较⾼,为 $O(1.5n)$;
  • 时间轮:对于增删查,时间复杂度为 $O(1)$ ;查找最⼩节点也为 $O(1)$;

要点

  1. 有序的结构,且增加删除操作不影响该结构有序;
  2. 能快速查找最⼩节点;
  3. 时间轮增加操作只从单个定时任务触发,忽略定时任务之间的⼤⼩关系;⽽红⿊树、最⼩堆、跳表的有序性依赖定时任务之间的⼤⼩关系;

红⿊树

void ngx_rbtree_insert_timer_value(ngx_rbtree_node_t *temp,
ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel) {
    ngx_rbtree_node_t **p;
     for ( ;; ) {
         // 这⾥是重点
         p = ((ngx_rbtree_key_int_t) (node->key - temp->key) < 0) ? &temp->left : &temp->right;
         
        if (*p == sentinel) {
             break;
         }
         temp = *p;
     }
 
    *p = node;
    node->parent = temp;
    node->left = sentinel;
    node->right = sentinel;
    ngx_rbt_red(node);
}

STL中 map 结构采⽤的是红⿊树来实现,但是定时器不要使⽤ map 结构来实现,因为可能多个定时任务需要同时被触发, map 中的key是惟⼀的;

红⿊树的节点同时包含 key 和 val ,红⿊树节点的有序由 key 来决定的;插⼊节点的时候,通过⽐较key来决定节点存储位置;红⿊树的实现并没有要求 key 唯⼀;如上代码示例, for 循环中 (node->key - temp->key) < 0)? &temp->left : &temp->right; 当key相同的时候取值为 temp->right ;

思考: map 结构中插⼊节点这⾥如何操作?

最⼩堆

概述

满⼆叉树:所有的层节点数都是该层所能容纳节点的最⼤数量(满⾜ );

完全⼆叉树:若⼆叉树的深度为 h ,除了 h 层外,其他层的节点数都是该层所能容纳节点的最⼤数量(满⾜ $2^n;n >= 0$),且 h 层都集中在最左侧;

最⼩堆

  1. 是⼀颗完全⼆叉树;
  2. 某⼀个节点的值总是⼩于等于它的⼦节点的值;
  3. 堆中每个节点的⼦树都是最⼩堆;

 title=

增加操作

为了满⾜完全⼆叉树定义,往⼆叉树最⾼层沿着最左侧添加⼀个节点;然后考虑是否能上升操作;

如果此时添加值为 4 的节点, 4 节点是5节点的左⼦树; 4 ⽐ 5 ⼩, 4 和 5 需要交换位置;

删除操作

删除操作需要先查找是否包含这个节点,最⼩堆的查找效率是 $O(n)$;查找之后,交换最后⼀个节点,先考虑下降操作,如果操作失败则上升操作;最后删除最后⼀个节点;

例如:假设删除 1 号节点,则需要下沉操作;假设删除 9 号节点,则需要上升操作;

时间轮

 title=

从时钟表盘出发,如何⽤数据结构来描述秒表的运转;

int seconds[60]; // 数组来描述表盘刻度;

++tick 60;每秒钟 ++tick 来描述秒针移动;对让秒针永远在间$[0,59]$移动;

对于时钟来说,它的时间精度(最⼩运⾏单元)是1秒;

单层级时间轮

背景

⼼跳检测:客户端每 5 秒钟发送⼼跳包;服务端若 10 秒内没收到⼼跳数据,则清除连接;

实际在开发过程中,若收到除了⼼跳包的其他数据,⼼跳检测也算通过,在这⾥为了简化流程,只判断⼼跳包;

作为对⽐:我们假设使⽤ map<int, conn*> 来存储所有连接数;每秒检测 map 结构,那么每秒需要遍历所有的连接,如果这个map结构包含⼏万条连接,那么我们做了很多⽆效检测;考虑极端情况,刚添加进来的连接,下⼀秒就需要去检测,实际上只需要10秒后检测就⾏了;那么我们考虑使⽤时间轮来检测;

注意:这个例⼦只是⽤来帮助理解时间轮,不代表实际解决⽅案;

设计

1. 准备⼀个数组存储连接数据;那么数组⻓度设置为多少?

2. 考虑⼀秒内添加了多条连接,那么可以参考 hash 结构处理冲突的⽅式,⽤链表链接起来;

3. 回到 1 中的问题,如果想 2 中链表稀疏,将数组⻓度设置⼤⼀些;如果想紧凑些,则将数组⻓度设置⼩些(但是必须⼤于10);

4. 假设我们设置数组⻓度为 11;那么检测指针的移动可描述为 ++point % 11;

$\ m\%n = m - n \times floor(m/n)$

优化:将 n 替换为 ,这⾥ 恰好⼤于 n;这样以来 可以转化为;所以我们可以选择 16( ),那么检测指针移动可优化为 ++point ;

5. 考虑到正常情况下 5 秒钟发送⼀次⼼跳包,10 秒才检测⼀次,如下图到索引为 10 的时候并不能踢掉连接;所以需要每收到⼀次⼼跳包则 used++ ,每检测⼀次 used-- ;当检测used == 0 则踢掉连接;

 title=

多层级时间轮

背景

参照时钟表盘的运转规律,可以将定时任务根据触发的紧急程度,分布到不同层级的时间轮中;假设时间精度为 10ms ;在第 1 层级每 10ms 移动⼀格;每移动⼀格执⾏该格⼦当中所有的定时任务;

当第 1 层指针从 255 格开始移动,此时层级 2 移动⼀格;层级 2 移动⼀格的⾏为定义为,将该格当中的定时任务重新映射到层级 1 当中;同理,层级 2 当中从 63 格开始移动,层级 3 格⼦中的定时任务重新映射到层级 2 ; 以此类推层级 4 往层级 3 映射,层级 5 往层级 4 映射;

如何重新映射?定时任务的过期时间对上⼀层级的⻓度取余分布在上⼀层级不同格⼦当中;

 title=

添加节点

void add_node(timer_t *T, timer_node_t *node) {
    uint32_t time=node->expire;
    uint32_t current_time=T->time;
    uint32_t msec = time - current_time;
     if (msec < TIME_NEAR) { //[0, 0x100)
         // time % 256
         link(&T->near[time&TIME_NEAR_MASK],node);
    } else if (msec < (1 << (TIME_NEAR_SHIFT + TIME_LEVEL_SHIFT))) {
        //[0x100, 0x4000)
        // floor(time/2^8) % 64
         link(&T->t[0][((time >> TIME_NEAR_SHIFT) & TIME_LEVEL_MASK)],node); 
    } else if (msec < (1 << (TIME_NEAR_SHIFT + 2 *TIME_LEVEL_SHIFT))) {
        //[0x4000, 0x100000)
         // floor(time/2^14) % 64
         link(&T->t[1][((time >> (TIME_NEAR_SHIFT + TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node); 
    } else if (msec < (1 << (TIME_NEAR_SHIFT + 3 *TIME_LEVEL_SHIFT))) {
        //[0x100000, 0x4000000)
         // floor(time/2^20) % 64
         link(&T->t[2][((time >> (TIME_NEAR_SHIFT + 2 * TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node);  
    } else {
        //[0x4000000, 0xffffffff]
         // floor(time/2^26) % 64
         link(&T->t[3][((time >> (TIME_NEAR_SHIFT + 3 * TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node); 
 } }

重新映射

void timer_shift(timer_t *T) {
    int mask = TIME_NEAR;
     uint32_t ct = ++T->time; // 第⼀层级指针移动 ++ ⼀次代表10ms
     if (ct == 0) {
         move_list(T, 3, 0);
    } else {
         // floor(ct / 256)
         uint32_t time = ct >> TIME_NEAR_SHIFT;
         int i=0;
         // ct % 256 == 0 说明是否移动到了 不同层级的 最后⼀格
         while ((ct & (mask-1))==0) {
             int idx=time & TIME_LEVEL_MASK;
             if (idx!=0) {
                 move_list(T, i, idx); // 这⾥发⽣重新映射,将i+1层级idx格⼦中的定时任务重新映射到i层级中
             }
             mask <<= TIME_LEVEL_SHIFT;
             time >>= TIME_LEVEL_SHIFT;
             ++i;
         }
     } 
}
maksim
Maksim(一笑,吡罗),PHPer,Goper
OωO
开启隐私评论,您的评论仅作者和评论双方可见