LevelDB源码分析之二:comparator
扫描二维码
随时随地手机看文章
这里的comparator包括抽象类Comparator及其两个实现类:一个是内置的BytewiseComparatorImpl,另一个是InternalKeyComparator。
一.Comparator
Comparator只是导出了几个接口。
class Comparator { public: virtual ~Comparator(); // Three-way comparison. Returns value: // < 0 iff "a" < "b", // == 0 iff "a" == "b", // > 0 iff "a" > "b" virtual int Compare(const Slice& a, const Slice& b) const = 0; // The name of the comparator. Used to check for comparator // mismatches (i.e., a DB created with one comparator is // accessed using a different comparator. // // The client of this package should switch to a new name whenever // the comparator implementation changes in a way that will cause // the relative ordering of any two keys to change. // // Names starting with "leveldb." are reserved and should not be used // by any clients of this package. //返回comparator的名字 virtual const char* Name() const = 0; // Advanced functions: these are used to reduce the space requirements // for internal data structures like index blocks. // If *start < limit, changes *start to a short string in [start,limit). // Simple comparator implementations may return with *start unchanged, // i.e., an implementation of this method that does nothing is correct. // 这两个函数作用是减少像index blocks这样的内部数据结构占用的空间。 // 如果*start < limit,就在[start,limit)中找到一个短字符串,并赋给*start返回。 // 当然返回的*start可能没变化(*start==limit),此时这个函数相当于啥都没干,这也是正确的。 virtual void FindShortestSeparator( std::string* start, const Slice& limit) const = 0; // Changes *key to a short string >= *key. // Simple comparator implementations may return with *key unchanged, // i.e., an implementation of this method that does nothing is correct. // 将*key变成一个比原*key大的短字符串,并赋值给*key返回。 virtual void FindShortSuccessor(std::string* key) const = 0; };
二.BytewiseComparatorImpl
BytewiseComparatorImpl是按字典逐字节序进行比较的,也就是说i>helloworld,因为先比较i和h,i>h,比较结束。
class BytewiseComparatorImpl : public Comparator { public: BytewiseComparatorImpl() { } virtual const char* Name() const { return "leveldb.BytewiseComparator"; } // 直接调用Slice的compare函数 virtual int Compare(const Slice& a, const Slice& b) const { return a.compare(b); } // FindShortestSeparator找到start、limit之间最短的字符串,如“helloworld”和”hellozoomer”之间最短的key可以是”hellox”。 virtual void FindShortestSeparator( std::string* start, const Slice& limit) const { // 找到共同前缀的长度 size_t min_length = std::min(start->size(), limit.size()); size_t diff_index = 0; while ((diff_index < min_length) && ((*start)[diff_index] == limit[diff_index])) { diff_index++; } // 如果一个字符串是另个一字符串的前缀,无需做截短操作,否则进入else。 if (diff_index >= min_length) { // Do not shorten if one string is a prefix of the other } else { uint8_t diff_byte = static_cast((*start)[diff_index]); if (diff_byte < static_cast(0xff) && diff_byte + 1 < static_cast(limit[diff_index])) { (*start)[diff_index]++; start->resize(diff_index + 1); assert(Compare(*start, limit) < 0); } } } // FindShortSuccessor则更极端,用于找到比key大的最短字符串,如传入“helloworld”,返回的key可能是“i”而已。 virtual void FindShortSuccessor(std::string* key) const { // Find first character that can be incremented size_t n = key->size(); for (size_t i = 0; i < n; i++) { const uint8_t byte = (*key)[i]; if (byte != static_cast(0xff)) { (*key)[i] = byte + 1; key->resize(i+1); return; } } // *key is a run of 0xffs. Leave it alone. } };
三.Internal Key
1.Internal Key的结构。
下面是一个未编码前,或者说是一个解码后的Internal Key结构,它由user_key、sequence和type三个字段组成。
struct ParsedInternalKey { Slice user_key; SequenceNumber sequence; ValueType type; ParsedInternalKey() { } // Intentionally left uninitialized (for speed) ParsedInternalKey(const Slice& u, const SequenceNumber& seq, ValueType t) : user_key(u), sequence(seq), type(t) { } std::string DebugString() const; };
2.Internal Key的编码
源码中通过InternalKey类封装了Internal Key的相关操作。编码的用到的函数如下。
void AppendInternalKey(std::string* result, const ParsedInternalKey& key) { result->append(key.user_key.data(), key.user_key.size()); PutFixed64(result, PackSequenceAndType(key.sequence, key.type)); } static uint64_t PackSequenceAndType(uint64_t seq, ValueType t) { assert(seq <= kMaxSequenceNumber); assert(t <= kValueTypeForSeek); return (seq << 8) | t; }
AppendInternalKey函数先把user_key添加到*result中,然后用PackSequenceAndType函数将sequence和type打包,并将打包的结果添加到*result中。
PutFixed64函数只是简单的进行了拷贝,详见博客:LevelDB源码分析之一:coding
PackSequenceAndType函数的原理是先将seq左移8位,然后和t进行或操作,相当于把t放到了seq的低8为。为什么seq要小于等于kMaxSequenceNumber呢。
因为kMaxSequenceNumber的值如下所示。
typedef uint64_t SequenceNumber; // We leave eight bits empty at the bottom so a type and sequence# // can be packed together into 64-bits. //0x1ull:u-unsigned 无符号;l-long 长整型,ll就是64位整型。整个0x1ull代表的含义是无符号64位整型常量1,用16进制表示。 static const SequenceNumber kMaxSequenceNumber = ((0x1ull << 56) - 1);
用二进制表示就是:0000 0000 1111 1111 1111 1111 1111 1111。如果seq大于kMaxSequenceNumber,左移8位的话会移出界。
3.Internal key的解码
inline bool ParseInternalKey(const Slice& internal_key, ParsedInternalKey* result) { const size_t n = internal_key.size(); if (n < 8) return false; // DecodeFixed64函数是PutFixed64函数的逆过程,返回sequence和type的打包结果,并赋值给num。 uint64_t num = DecodeFixed64(internal_key.data() + n - 8); // 截取低8位,赋值给c unsigned char c = num & 0xff; // 右移8位,赋值给sequence result->sequence = num >> 8; // 将c转换为type result->type = static_cast(c); // 取出user_key result->user_key = Slice(internal_key.data(), n - 8); return (c <= static_cast(kTypeValue)); }
通过上述解码函数可以将编码后的internal_key解码为* result。为了使用方便,源码中还专门为解码user_key和type提供了两个函数。
// Returns the user key portion of an internal key. inline Slice ExtractUserKey(const Slice& internal_key) { assert(internal_key.size() >= 8); return Slice(internal_key.data(), internal_key.size() - 8); } inline ValueType ExtractValueType(const Slice& internal_key) { assert(internal_key.size() >= 8); const size_t n = internal_key.size(); uint64_t num = DecodeFixed64(internal_key.data() + n - 8); unsigned char c = num & 0xff; return static_cast(c); }
四.InternalKeyComparator
InternalKeyComparator是要重点关注的一个比较器,它用来比较内部键(Internal Key)。内部键值是为了方便处理,将原普通键、序列号和值类型组成的新键。
先看下Compare函数
int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const { // Order by: // increasing user key (according to user-supplied comparator) // decreasing sequence number // decreasing type (though sequence# should be enough to disambiguate) int r = user_comparator_->Compare(ExtractUserKey(akey), ExtractUserKey(bkey)); if (r == 0) { const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8); const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8); if (anum > bnum) { r = -1; } else if (anum < bnum) { r = +1; } } return r; }
1)首先比较user_key,如果user_key不相同,就直接返回比较结果,否则继续进行第二步。user_comparator_是用户指定的比较器,在InternalKeyComparator构造时传入。
2)在user_key相同的情况下,比较sequence_numer|value type然后返回结果(注意每个Internal Key的sequence_number是唯一的,因此不可能出现anum==bnum的情况)
再看看FindShortestSeparator函数
void InternalKeyComparator::FindShortestSeparator( std::string* start, const Slice& limit) const { // Attempt to shorten the user portion of the key Slice user_start = ExtractUserKey(*start); Slice user_limit = ExtractUserKey(limit); std::string tmp(user_start.data(), user_start.size()); user_comparator_->FindShortestSeparator(&tmp, user_limit); if (tmp.size() < user_start.size() && user_comparator_->Compare(user_start, tmp) < 0) { // User key has become shorter physically, but larger logically. // Tack on the earliest possible number to the shortened user key. PutFixed64(&tmp, PackSequenceAndType(kMaxSequenceNumber,kValueTypeForSeek)); assert(this->Compare(*start, tmp) < 0); assert(this->Compare(tmp, limit) < 0); start->swap(tmp); } }
1)该函数取出Internal Key中的user_key字段,根据用户指定的comparator找到短字符串并替换user_start。此时user_start物理上是变短了,但是逻辑上却变大了,原理详见第二节的论述。
2)如果user_start被替换了,就用新的user_start更新Internal Key,并使用最大的sequence number。否则start保持不变。
接下来FindShortSuccessor函数
void InternalKeyComparator::FindShortSuccessor(std::string* key) const { Slice user_key = ExtractUserKey(*key); std::string tmp(user_key.data(), user_key.size()); user_comparator_->FindShortSuccessor(&tmp); if (tmp.size() < user_key.size() && user_comparator_->Compare(user_key, tmp) < 0) { // User key has become shorter physically, but larger logically. // Tack on the earliest possible number to the shortened user key. PutFixed64(&tmp, PackSequenceAndType(kMaxSequenceNumber,kValueTypeForSeek)); assert(this->Compare(*key, tmp) < 0); key->swap(tmp); } }
原理上与FindShortestSeparator相同。