博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
leveldb代码精读 skiplist
阅读量:2432 次
发布时间:2019-05-10

本文共 7909 字,大约阅读时间需要 26 分钟。

skiplist是在链表的基础上,在节点内添加若干个指向后面第n个节点的指针,使得链表实现类似树状的结构。
它和普通链表的显著区别
1 skiplist的节点由于记录了后面第n个节点的地址,也就是有了分支,也和树一样有高度的概念,
2 skiplist是排序链表,而每个节点的数据类型是模板实现的,也就是可以是任意类型。因此为了排序,用户需要指定一个Comparator类的实现,告诉skiplist如何比较两个节点数据的大小。
3 正是因为skiplist是排序的,因此在skiplist里查找数据的效率特别高,节点记录了下一个和下面第n个节点的地址,可以实现类似二分法查找的效率。
如下图,节点里的指针越多,就形象地成为高度越高,形成了若干层。
最下面串起所有节点的是第0层,最上面是第3层。
leveldb中skiplist的定义和实现都在 skiplist.h 一个文件里。
skiplist是一个模板类,类里面包含了node类和一个自定义的iterator类用来遍历
  1. template<typename Key, class Comparator>
  2. class SkipList {
  3. private:
  4.   struct Node;
  5.   ...
  6.   class Iterator {
  7.     ...
  8.   }
  9.   ...
  10. }
先看看Node里有什么
Node是以结构体定义的,因此变量和函数默认是public的。
template
<
typename
Key
,
class
Comparator
>
  1. struct SkipList<Key,Comparator>::Node {
  2. ...
  3. }
public成员就是Node的数据(key),构造函数也就是对key进行简单赋值
  1. explicit Node(const Key& k) : key(k) {
    }
  2. Key const key;
私有成员只有一个指针数组,指向后面的几个节点地址。
leveldb里的port模块将各个平台的基本操作封装起来,比如port::AtomicPointer本质上就是void*。因此port::AtomicPointer next_[1]其实就是执行后面节点的指针数组。
指针越多,节点的level越高。
  1. port::AtomicPointer next_[1];
下面是Node的操作
都是简单的指针操作,函数很简单。
获取后面第n个节点地址
  1. Node* Next(int n) {
  2.     assert(n >= 0);
  3.     // Use an 'acquire load' so that we observe a fully initialized
  4.     // version of the returned Node.
  5.     return reinterpret_cast<Node*>(next_[n].Acquire_Load());
  6.   }
设置后面第n个节点的数据
  1. void SetNext(int n, Node* x) {
  2.     assert(n >= 0);
  3.     // Use a 'release store' so that anybody who reads through this
  4.     // pointer observes a fully initialized version of the inserted node.
  5.     next_[n].Release_Store(x);
  6.   }
下面两个函数同样是获取节点和设置数据,区别是用的是“NoBarrier”操作
  1. // No-barrier variants that can be safely used in a few locations.
  2.   Node* NoBarrier_Next(int n) {
  3.     assert(n >= 0);
  4.     return reinterpret_cast<Node*>(next_[n].NoBarrier_Load());
  5.   }
  6.   void NoBarrier_SetNext(int n, Node* x) {
  7.     assert(n >= 0);
  8.     next_[n].NoBarrier_Store(x);
  9.   }
关于Next和NoBarrier-Next的区别,主要是里面这处
  1. next_[n].Acquire_Load()
  2. next_[n].NoBarrier_Load()
下面是两个函数的具体内容
NoBarrier_Load是直接将指针返回
而Acquire_Load在返回指针之前先要记性一次“内存屏障”操作
  1. inline void* NoBarrier_Load() const {
    return rep_; }
  2. inline void* Acquire_Load() const {
  3.   void* result = rep_;
  4.   MemoryBarrier();
  5.   return result;
  6. }
关于内存屏障,大概意思就是cpu为了尽量发挥自己的性能,减少对内存操作的等待,能够在不影响最终结果的前提下自行改变代码的执行顺序。
但是在多核处理器处理共享内存区域时,各个核心自行修改代码执行顺序,就不能保证每个核心的执行结果都正确了,各个核心对变量的一致性被破坏。
内存屏障是一条cpu指令,它有两个主要功能
1 确保内存屏障之前的代码全都执行完成后才能执行屏障之后的。也就是屏障前后的代码执行顺序不能颠倒。
2 强制cpu刷一次之前操作的变量缓存,保证每个cpu都能看到最新的变量值。
再来看skiplist主体的内容
关键的成员变量(都是私有的)
1 最大高度,初始化用
  1. enum {
    kMaxHeight = 12 };
2 用于比较两个节点的数据大小,排序和查找会用到
  1. Comparator const compare_;
3 leveldb的内存池
关于内存池是如何实现的,参考我的博客
  1. Arena* const arena_;
4 头节点
  1. Node* const head_;
5 在插入时修改skiplist的最大高度
  1. port::AtomicPointer max_height_;
6 用来随机决定新节点的高度
  1. Random rnd_;
构造函数
指定节点的数据类型和用户自己实现的Comparator类,用于比较节点内数据的大小。
还要指定一个内存池arena。skiplist的内存都要从这里分配。
  1. template<typename Key, class Comparator>
  2. SkipList<Key,Comparator>::SkipList(Comparator cmp, Arena* arena)
  3.       // 对成员变量进行赋值
  4.     : compare_(cmp),
  5.       arena_(arena),
  6.       // 创建头节点
  7.       head_(NewNode(0 /* any key will do */, kMaxHeight)),
  8.       // 初始化最大高度
  9.       max_height_(reinterpret_cast<void*>(1)),
  10.       rnd_(0xdeadbeef) {
  11.   // 从头节点开始,初始化一个空skiplist
  12.   for (int i = 0; i < kMaxHeight; i++) {
  13.     head_->SetNext(i, NULL);
  14.   }
  15. }
先看一下主要的私有函数,后面的public函数会用到
1 创建新节点,指定结存存放的数据类型和高度
  1. template<typename Key, class Comparator>
  2. typename SkipList<Key,Comparator>::Node*
  3. SkipList<Key,Comparator>::NewNode(const Key& key, int height) {
  4.   // 从arena里分配一块内存,大小是Node本身大小加上一系列指针的大小。height - 1是因为Node本身的next_就可以指向第0层。
  5.   char* mem = arena_->AllocateAligned(
  6.       sizeof(Node) + sizeof(port::AtomicPointer) * (height - 1));
  7.   // new后面跟地址,表示在指定的地址分配空间
  8.   return new (mem) Node(key);
  9. }
2 返回给定的key是否应该在指定Node的后面,即是否大于Node里的key。
template<typename Key, class Comparator>
bool SkipList<Key,Comparator>::KeyIsAfterNode(const Key& key, Node* n) const {
  // NULL n is considered infinite
  return (n != NULL) && (compare_(n->key, key) < 0); // 这里就调用了用户自己实现的compare_,比较两个自定义对象的大小。
}
3 查找和给定key相等或者比它大的最小节点
  1. template<typename Key, class Comparator>
  2. typename SkipList<Key,Comparator>::Node* SkipList<Key,Comparator>::FindGreaterOrEqual(const Key& key, Node** prev)
  3.     const {
  4.   // x是当前的节点。从头节点开始
  5.   Node* x = head_;
  6.   // 头节点所在的是最高层
  7.   int level = GetMaxHeight() - 1;
  8.   while (true) {
  9.     // 在第level层里找
  10.     Node* next = x->Next(level);
  11.     // 比较大小
  12.     if (KeyIsAfterNode(key, next)) {
  13.       // Keep searching in this list
  14.       x = next; // 如果key比next节点的key大,就将x指向next,继续寻找。相当于在这一层向右移动着找。
  15.     } else {
  16.       // 这个prev是有调用者提供的空节点,level至少跟this的相当。
  17.       // prev作用是保存每一层比key小的最大节点,也就是查找过程中,在每一层走过的路径。
  18.       if (prev != NULL) prev[level] = x;
  19.       if (level == 0) {
  20.         return next;
  21.       } else {
  22.         // Switch to next list
  23.         level--;
  24.       }
  25.     }
  26.   }
  27. }
4 查找比给定key小的最大的节点
  1. template<typename Key, class Comparator>
  2. typename SkipList<Key,Comparator>::Node*
  3. SkipList<Key,Comparator>::FindLessThan(const Key& key) const {
  4.   Node* x = head_;
  5.   int level = GetMaxHeight() - 1;
  6.   while (true) {
  7.     assert(x == head_ || compare_(x->key, key) < 0); // 当前节点必须比key小。
  8.     Node* next = x->Next(level); // 在本层寻找
  9.     if (next == NULL || compare_(next->key, key) >= 0) {
  10.       // 如果下一个节点比key打,就不在本层继续了,要去下一层。除非已经到底0层了。
  11.       if (level == 0) {
  12.         return x;
  13.       } else {
  14.         // Switch to next list
  15.         level--;
  16.       }
  17.     } else {
  18.       x = next; // 在本层继续往后找
  19.     }
  20.   }
  21. }
5 查找最后一个非空节点
  1. template<typename Key, class Comparator>
  2. typename SkipList<Key,Comparator>::Node* SkipList<Key,Comparator>::FindLast()
  3.     const {
  4.   Node* x = head_;
  5.   int level = GetMaxHeight() - 1;
  6.   while (true) {
  7.     Node* next = x->Next(level);
  8.     if (next == NULL) {
  9.       // 如果下一个是空节点,就看level,如果没到0,就继续到下层找,如果到0了,x就是最后一个节点
  10.       if (level == 0) {
  11.         return x;
  12.       } else {
  13.         // Switch to next list
  14.         level--;
  15.       }
  16.     } else {
  17.       x = next;
  18.     }
  19.   }
  20. }
以上几个私有函数虽然都是查找,但是因为skiplist是排序的,因此每次插入都要先查找一个合适的位置。
代码可能不只管,下面以查找12举例
1 从头节点最高层开始,判断12是否大于头节点在level 3的next节点(12)。
2 12比6大,因此移动到节点6,在看12是否比6在level 3的next节点(NULL)大。
3 比较大小调用了KeyIsAfterNode,由于是越靠右越大,因此它将null视为无穷大。因此这次比较返回false。
4 由于6在level 3的next大于12,因此从6开始降一层,看level 2。
5 在节点6的level 2 重复上面操作,发现6在level 2的next,也就是25,大于12,因此再下探一层。
6 在节点6的level 1 重复上面操作,找到了接单9。
7 在节点9的level 1 上的next(17)大于12,因此将到level 0。
8 在节点9的level 0 上看next,发现等于12,查找完成。
最终的查找路径就是 head --> 6 --> 9 --> 12,查找效率不错。
skiplist对外提供的函数就是插入和查找。
看过上面的私有函数后,下面两个函数就直观了
1 将key插入到skiplist里。
template<typename Key, class Comparator>
void SkipList<Key,Comparator>::Insert(const Key& key) {
  // TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
  // here since Insert() is externally synchronized.
  Node* prev[kMaxHeight];
  Node* x = FindGreaterOrEqual(key, prev);
  // Our data structure does not allow duplicate insertion
  assert(x == NULL || !Equal(key, x->key)); // 不允许插入重复值
  // 随机决定新节点的高度
  int height = RandomHeight();
  if (height > GetMaxHeight()) {
    // 将高过当前最大高度的部分指向头节点,不清楚为什么不像构造函数那样设成NULL
    for (int i = GetMaxHeight(); i < height; i++) {
      prev[i] = head_;
    }
    //fprintf(stderr, "Change height from %d to %d\n", max_height_, height);
    // It is ok to mutate max_height_ without any synchronization
    // with concurrent readers. A concurrent reader that observes
    // the new value of max_height_ will see either the old value of
    // new level pointers from head_ (NULL), or a new value set in
    // the loop below. In the former case the reader will
    // immediately drop to the next level since NULL sorts after all
    // keys. In the latter case the reader will use the new node.
    // 更新skiplist的最大高度
    max_height_.NoBarrier_Store(reinterpret_cast<void*>(height));
  }
  // 创建新节点
  x = NewNode(key, height);
  for (int i = 0; i < height; i++) {
    // NoBarrier_SetNext() suffices since we will add a barrier when
    // we publish a pointer to "x" in prev[i].
    // 由于prev记录了查找key的路径,因此将新节点x插入到prev的每一层和 prev.next之间
    x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
    prev[i]->SetNext(i, x);
  }
}
2 查找key,其实就是看有没有和key相等的节点。
如果有,key本身就是查找结果。
  1. template<typename Key, class Comparator>
  2. bool SkipList<Key,Comparator>::Contains(const Key& key) const {
  3.   // 和insert类似,也需要调用FindGreaterOrEqual,只不过不需要记录查找的路径,因此priv是NULL
  4.   Node* x = FindGreaterOrEqual(key, NULL);
  5.   if (x != NULL && Equal(key, x->key)) {
  6.     return true;
  7.   } else {
  8.     return false;
  9.   }
  10. }
skiplist还自己实现了一个Iterator,用于遍历skiplist。
由于这个Iterator位于skiplist内部,有了skiplist的那几个私有函数,实现起来很简单。

来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/26239116/viewspace-1839630/,如需转载,请注明出处,否则将追究法律责任。

转载于:http://blog.itpub.net/26239116/viewspace-1839630/

你可能感兴趣的文章
SEO技巧中你可能没有注意的细节(转)
查看>>
微软开始二代Windows Live 不见Cloud OS踪影
查看>>
SQL Server 7.0 入门(四)(转)
查看>>
创建ISAPI扩展(转)
查看>>
Windows Server 2003 R2 Beta 2将公测(转)
查看>>
病毒及木马预警一周播报(06.04.17~04.23)(转)
查看>>
黑客口述:我的第一台3389肉鸡的经历(转)
查看>>
关于 cleanup stack 和 two phase consturction [1](转)
查看>>
Oracle数据导入导出imp/exp (转)
查看>>
如何构建固定网(PSTN)短消息系统(转)
查看>>
Delphi文件管理(三)(转)
查看>>
关于网线的一些问题的解答(转)
查看>>
深度分析Win 2003自动升级补丁功能(转)
查看>>
使用Carbide.vs与VS.NET2003构建Symbian开发平台-S60 平台(转)
查看>>
来访者地址统计,很好的一个程序!(转)
查看>>
UpdateWindow函数 (转)
查看>>
移动通信的主要测量指标及注意事项(转)
查看>>
无盘网络正确网络配置建议-减少卡机蓝屏关键(转)
查看>>
如何在Delphi中调用oracle的存储过程返回数据集(转)
查看>>
ASP指南:ADO/SQL(数据存取) (转)
查看>>