0%

bblot bucket

[TOC]

Bucket数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Bucket represents a collection of key/value pairs inside the database.
type Bucket struct {
*bucket
tx *Tx // the associated transaction
buckets map[string]*Bucket // subbucket cache
page *page // inline page reference
rootNode *node // materialized node for the root page.
nodes map[pgid]*node // node cache

// Sets the threshold for filling nodes when they split. By default,
// the bucket will fill to 50% but it can be useful to increase this
// amount if you know that your write workloads are mostly append-only.
//
// This is non-persisted across transactions so it must be set in every Tx.
FillPercent float64
}

// bucket represents the on-file representation of a bucket.
// This is stored as the "value" of a bucket key. If the bucket is small enough,
// then its root page can be stored inline in the "value", after the bucket
// header. In the case of inline buckets, the "root" will be 0.
type bucket struct {
root pgid // page id of the bucket's root-level page
sequence uint64 // monotonically incrementing, used by NextSequence()
}

Bucket遍历之Cursor

本节我们先做一节内容的铺垫,暂时不讲如何创建、获取、删除一个Bucket。而是介绍一个boltdb中的新对象Cursor。

答案是:所有的上述操作都是建立在首先定位到一个Bucket所属的位置,然后才能对其进行操作。而定位一个Bucket的功能就是由Cursor来完成的。所以我们先这一节给大家介绍一下boltdb中的Cursor。

我们先看下官方文档对Cursor的描述

Cursor represents an iterator that can traverse over all key/value pairs in a bucket in sorted order.

用大白话讲,既然一个Bucket逻辑上是一颗b+树,那就意味着我们可以对其进行遍历。前面提到的set、get操作,无非是要在Bucket上先找到合适的位置,然后再进行操作。而“找”这个操作就是交由Cursor来完成的。简而言之对Bucket这颗b+树的遍历工作由Cursor来执行。一个Bucket对象关联一个Cursor。下面我们先看看Bucket和Cursor之间的关系。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// Cursor creates a cursor associated with the bucket.
// The cursor is only valid as long as the transaction is open.
// Do not use a cursor after the transaction is closed.
func (b *Bucket) Cursor() *Cursor {
// Update transaction statistics.
b.tx.stats.CursorCount++

// Allocate and return a cursor.
return &Cursor{
bucket: b,
stack: make([]elemRef, 0),
}
}

// elemRef represents a reference to an element on a given page/node.
type elemRef struct {
page *page
node *node
index int
}

// isLeaf returns whether the ref is pointing at a leaf page/node.
func (r *elemRef) isLeaf() bool {
if r.node != nil {
return r.node.isLeaf
}
return (r.page.flags & leafPageFlag) != 0
}

// count returns the number of inodes or page elements.
func (r *elemRef) count() int {
if r.node != nil {
return len(r.node.inodes)
}
return int(r.page.count)
}

Cursor结构

1
2
3
4
5
6
7
8
9
type Cursor struct {
bucket *Bucket
stack []elemRef
}

// Bucket returns the bucket that this cursor was created from.
func (c *Cursor) Bucket() *Bucket {
return c.bucket
}

Cursor对外接口

下面我们看一下Cursor对外暴露的接口有哪些。看之前也可以心里先想一下。针对一棵树我们需要哪些遍历接口呢?

主体也就是三类:定位到某一个元素的位置、在当前位置从前往后找、在当前位置从后往前找

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (c *Cursor) First() (key []byte, value []byte)

func (c *Cursor) Last() (key []byte, value []byte)

func (c *Cursor) Next() (key []byte, value []byte)

func (c *Cursor) Prev() (key []byte, value []byte)

func (c *Cursor) Delete() error
// Seek moves the cursor to a given key and returns it.
// If the key does not exist then the next key is used. If no keys
// follow, a nil key is returned.
// The returned key and value are only valid for the life of the transaction.
func (c *Cursor) Seek(seek []byte) (key []byte, value []byte)

page node

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// pageNode returns the in-memory node, if it exists.
// Otherwise returns the underlying page.
func (b *Bucket) pageNode(id pgid) (*page, *node) {
// Inline buckets have a fake page embedded in their value so treat them
// differently. We'll return the rootNode (if available) or the fake page.
if b.root == 0 {
if id != 0 {
panic(fmt.Sprintf("inline bucket non-zero page access(2): %d != 0", id))
}
if b.rootNode != nil {
return nil, b.rootNode
}
return b.page, nil
}

// Check the node cache for non-inline buckets.
if b.nodes != nil {
if n := b.nodes[id]; n != nil {
return nil, n
}
}

// Finally lookup the page from the transaction if no node is materialized.
return b.tx.page(id), nil
}

// data 表示 mmap 的 file 数据
// page retrieves a page reference from the mmap based on the current page size.
func (db *DB) page(id pgid) *page {
pos := id * pgid(db.pageSize)
return (*page)(unsafe.Pointer(&db.data[pos]))
}

Seek(key)实现分析

Seek()方法内部主要调用了seek()私有方法,我们重点关注seek()这个方法的实现,该方法有三个返回值,前两个为key、value、第三个为叶子节点的类型。前面提到在boltdb中,叶子节点元素有两种类型:一种是嵌套的子桶、一种是普通的key/value。而这二者就是通过flags来区分的。如果叶子节点元素为嵌套的子桶时,返回的flags为1,也就是bucketLeafFlag取值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
func (c *Cursor) Seek(seek []byte) (key []byte, value []byte) {
k, v, flags := c.seek(seek)
// If we ended up after the last element of a page then move to the next one.
// 下面这一段逻辑是必须的,因为在seek()方法中,如果ref.index>ref.count()的话,就直接返回nil,nil,0了
// 这里需要返回下一个
if ref := &c.stack[len(c.stack)-1]; ref.index >= ref.count() {
k, v, flags = c.next()
}
if k == nil {
return nil, nil
// 子桶的话
} else if (flags & uint32(bucketLeafFlag)) != 0 {
return k, nil
}
return k, v
}

// 实际上调用 search 方法
func (c *Cursor) seek(seek []byte) (key []byte, value []byte, flags uint32) {
// Start from root page/node and traverse to correct page.
c.stack = c.stack[:0]
c.search(seek, c.bucket.root)

// If this is a bucket then return a nil value.
return c.keyValue()
}

// 整个 search 就是构建了一个搜索栈,最后用 elemRef 中的 index, 这个 index 表示node 中 inodes 的下标,或者 page 下标。
// search recursively performs a binary search against a given page/node until it finds a given key.
func (c *Cursor) search(key []byte, pgid pgid) {
p, n := c.bucket.pageNode(pgid)
if p != nil && (p.flags&(branchPageFlag|leafPageFlag)) == 0 {
panic(fmt.Sprintf("invalid page type: %d: %x", p.id, p.flags))
}
e := elemRef{page: p, node: n}
// 搜索栈
c.stack = append(c.stack, e)

// If we're on a leaf page/node then find the specific node.
// 叶子结点就直接最后一次二分查询了
if e.isLeaf() {
c.nsearch(key)
return
}

// 搜索 node 或者 page, 差异非常小,因为本质上 node 就是 page 的 内存形式
if n != nil {
c.searchNode(key, n)
return
}
c.searchPage(key, p)
}

func (c *Cursor) searchNode(key []byte, n *node) {
var exact bool
index := sort.Search(len(n.inodes), func(i int) bool {
// TODO(benbjohnson): Optimize this range search. It's a bit hacky right now.
// sort.Search() finds the lowest index where f() != -1 but we need the highest index.
ret := bytes.Compare(n.inodes[i].key, key)
if ret == 0 {
exact = true
}
return ret != -1
})
if !exact && index > 0 {
index--
}
c.stack[len(c.stack)-1].index = index

// Recursively search to the next page.
c.search(key, n.inodes[index].pgid)
}

// keyValue returns the key and value of the current leaf element.
func (c *Cursor) keyValue() ([]byte, []byte, uint32) {
ref := &c.stack[len(c.stack)-1]

// If the cursor is pointing to the end of page/node then return nil.
if ref.count() == 0 || ref.index >= ref.count() {
return nil, nil, 0
}

// Retrieve value from node.
if ref.node != nil {
inode := &ref.node.inodes[ref.index]
return inode.key, inode.value, inode.flags
}

// Or retrieve value from page.
elem := ref.page.leafPageElement(uint16(ref.index))
return elem.key(), elem.value(), elem.flags
}

到这儿我们就已经看完所有的seek()查找一个key的过程了,其内部也很简单,就是从根节点开始,通过不断递归遍历每层节点,采用二分法来定位到具体的叶子节点。到达叶子节点时,其叶子节点内部存储的数据也是有序的,因此继续按照二分查找来找到最终的下标。

值得需要注意点:

在遍历时,我们都知道,有可能遍历到的当前分支节点数据并没有在内存中,此时就需要从page中加载数据遍历。所以在遍历过程中,优先在node中找,如果node为空的时候才会采用page来查找。

First()、Last()实现分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func (c *Cursor) First() (key []byte, value []byte) {
_assert(c.bucket.tx.db != nil, "tx closed")
// 清空stack
c.stack = c.stack[:0]
p, n := c.bucket.pageNode(c.bucket.root)
// 一直找到第一个叶子节点,此处在天添加stack时,一直让index设置为0即可
ref := elemRef{page: p, node: n, index: 0}
c.stack = append(c.stack, ref)
c.first()
// If we land on an empty page then move to the next value.
// https://github.com/boltdb/bolt/issues/450
// 当前页时空的话,找下一个
if c.stack[len(c.stack)-1].count() == 0 {
c.next()
}
k, v, flags := c.keyValue()
// 是桶
if (flags & uint32(bucketLeafFlag)) != 0 {
return k, nil
}
return k, v
}
// first moves the cursor to the first leaf element under the last page in the stack.
// 找到最后一个非叶子节点的第一个叶子节点。index=0的节点
func (c *Cursor) first() {
for {
// Exit when we hit a leaf page.
var ref = &c.stack[len(c.stack)-1]
if ref.isLeaf() {
break
}
// Keep adding pages pointing to the first element to the stack.
var pgid pgid
if ref.node != nil {
pgid = ref.node.inodes[ref.index].pgid
} else {
pgid = ref.page.branchPageElement(uint16(ref.index)).pgid
}
p, n := c.bucket.pageNode(pgid)
c.stack = append(c.stack, elemRef{page: p, node: n, index: 0})
}
}

Next 分析

使用方式

1
2
3
4
5
6
c := b.Cursor()

// 启动遍历模式
for k, v := c.First(); k != nil; k, v = c.Next() {
fmt.Printf("cursor, key=%s, value=%s\n", string(k), string(v))
}

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// next 的实现就是去移动每一个 node 的 index, 从叶子结点开始,这样就能遍历完所有的叶子结点了!!!
// next moves to the next leaf element and returns the key and value.
// If the cursor is at the last leaf element then it stays there and returns nil.
func (c *Cursor) next() (key []byte, value []byte, flags uint32) {
for {
// Attempt to move over one element until we're successful.
// Move up the stack as we hit the end of each page in our stack.
var i int
// c.stack 是在调用 First 函数时被写入的,所以,这样倒序来遍历能实现按序获取
for i = len(c.stack) - 1; i >= 0; i-- {
elem := &c.stack[i]
if elem.index < elem.count()-1 {
elem.index++
break
}
}

// If we've hit the root page then stop and return. This will leave the
// cursor on the last element of the last page.
if i == -1 {
return nil, nil, 0
}

// Otherwise start from where we left off in the stack and find the
// first element of the first leaf page.
c.stack = c.stack[:i+1]
// 获取下一个叶子结点
c.first()

// If this is an empty page then restart and move back up the stack.
// https://github.com/boltdb/bolt/issues/450
if c.stack[len(c.stack)-1].count() == 0 {
continue
}

return c.keyValue()
}
}

node节点的相关操作

在开始分析node节点之前,我们先看一下官方对node节点的描述

node represents an in-memory, deserialized page

一个node节点,既可能是叶子节点,也可能是根节点,也可能是分支节点。是物理磁盘上读取进来的页page的内存表现形式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// node represents an in-memory, deserialized page.
type node struct {
bucket *Bucket // 关联一个桶
isLeaf bool
unbalanced bool // 值为true的话,需要考虑页合并
spilled bool // 值为true的话,需要考虑页分裂
key []byte // 对于分支节点的话,保留的是最小的key
pgid pgid // 分支节点关联的页id
parent *node // 该节点的parent
children nodes // 该节点的孩子节点
inodes inodes // 该节点上保存的索引数据
}

/ inode represents an internal node inside of a node.
// It can be used to point to elements in a page or point
// to an element which hasn't been added to a page yet.
type inode struct {
// 表示是否是子桶叶子节点还是普通叶子节点。如果flags值为1表示子桶叶子节点,否则为普通叶子节点
flags uint32
// 当inode为分支元素时,pgid才有值,为叶子元素时,则没值
pgid pgid
key []byte
// 当inode为分支元素时,value为空,为叶子元素时,才有值
value []byte
}

node->page

page->node

node节点的增删改查

put(k,v)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// put inserts a key/value.
func (n *node) put(oldKey, newKey, value []byte, pgid pgid, flags uint32) {
if pgid >= n.bucket.tx.meta.pgid {
panic(fmt.Sprintf("pgid (%d) above high water mark (%d)", pgid, n.bucket.tx.meta.pgid))
} else if len(oldKey) <= 0 {
panic("put: zero-length old key")
} else if len(newKey) <= 0 {
panic("put: zero-length new key")
}

// Find insertion index.
index := sort.Search(len(n.inodes), func(i int) bool { return bytes.Compare(n.inodes[i].key, oldKey) != -1 })

// Add capacity and shift nodes if we don't have an exact match and need to insert.
exact := (len(n.inodes) > 0 && index < len(n.inodes) && bytes.Equal(n.inodes[index].key, oldKey))
if !exact {
n.inodes = append(n.inodes, inode{})
copy(n.inodes[index+1:], n.inodes[index:])
}

inode := &n.inodes[index]
inode.flags = flags
inode.key = newKey
inode.value = value
inode.pgid = pgid
_assert(len(inode.key) > 0, "put: zero-length inode key")
}

get(k)

在node中,没有get(k)的方法,其本质是在Cursor中就返回了get的数据。大家可以看看Cursor中的keyValue()方法。

del(k)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// del removes a key from the node.
func (n *node) del(key []byte) {
// Find index of key.
index := sort.Search(len(n.inodes), func(i int) bool { return bytes.Compare(n.inodes[i].key, key) != -1 })

// Exit if the key isn't found.
if index >= len(n.inodes) || !bytes.Equal(n.inodes[index].key, key) {
return
}

// Delete inode from the node.
n.inodes = append(n.inodes[:index], n.inodes[index+1:]...)

// Mark the node as needing rebalancing.
n.unbalanced = true
}

nextSibling()、prevSibling()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// childAt returns the child node at a given index.
func (n *node) childAt(index int) *node {
if n.isLeaf {
panic(fmt.Sprintf("invalid childAt(%d) on a leaf node", index))
}
return n.bucket.node(n.inodes[index].pgid, n)
}

// childIndex returns the index of a given child node.
func (n *node) childIndex(child *node) int {
index := sort.Search(len(n.inodes), func(i int) bool { return bytes.Compare(n.inodes[i].key, child.key) != -1 })
return index
}

// numChildren returns the number of children.
func (n *node) numChildren() int {
return len(n.inodes)
}

// nextSibling returns the next node with the same parent.
func (n *node) nextSibling() *node {
if n.parent == nil {
return nil
}
// 有父节点问题就不大了
index := n.parent.childIndex(n)
if index >= n.parent.numChildren()-1 {
return nil
}
return n.parent.childAt(index + 1)
}

// prevSibling returns the previous node with the same parent.
func (n *node) prevSibling() *node {
if n.parent == nil {
return nil
}
index := n.parent.childIndex(n)
if index == 0 {
return nil
}
return n.parent.childAt(index - 1)
}

Bucket的相关操作

创建一个Bucket

根据指定的key来创建一个Bucket,如果指定key的Bucket已经存在,则会报错。如果指定的key之前有插入过元素,也会报错。否则的话,会在当前的Bucket中找到合适的位置,然后新建一个Bucket插入进去,最后返回给客户端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// CreateBucket creates a new bucket at the given key and returns the new bucket.
// Returns an error if the key already exists, if the bucket name is blank, or if the bucket name is too long.
// The bucket instance is only valid for the lifetime of the transaction.
func (b *Bucket) CreateBucket(key []byte) (*Bucket, error) {
if b.tx.db == nil {
return nil, ErrTxClosed
} else if !b.tx.writable {
return nil, ErrTxNotWritable
} else if len(key) == 0 {
return nil, ErrBucketNameRequired
}
// Move cursor to correct position.
// 拿到游标
c := b.Cursor()
// 开始遍历、找到合适的位置
k, _, flags := c.seek(key)
// Return an error if there is an existing key.
if bytes.Equal(key, k) {
// 是桶,已经存在了
if (flags & bucketLeafFlag) != 0 {
return nil, ErrBucketExists
}
// 不是桶、但key已经存在了
return nil, ErrIncompatibleValue
}
// Create empty, inline bucket.
var bucket = Bucket{
bucket: &bucket{},
rootNode: &node{isLeaf: true},
FillPercent: DefaultFillPercent,
}
// 拿到bucket对应的value
var value = bucket.write()
// Insert into node.
key = cloneBytes(key)
// 插入到inode中
// c.node()方法会在内存中建立这棵树,调用n.read(page)
c.node().put(key, key, value, 0, bucketLeafFlag)
// Since subbuckets are not allowed on inline buckets, we need to
// dereference the inline page, if it exists. This will cause the bucket
// to be treated as a regular, non-inline bucket for the rest of the tx.
b.page = nil
//根据key获取一个桶
return b.Bucket(key), nil
}

获取一个Bucket

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/ Bucket retrieves a nested bucket by name.
// Returns nil if the bucket does not exist.
// The bucket instance is only valid for the lifetime of the transaction.
func (b *Bucket) Bucket(name []byte) *Bucket {
if b.buckets != nil {
if child := b.buckets[string(name)]; child != nil {
return child
}
}
// Move cursor to key.
// 根据游标找key
c := b.Cursor()
k, v, flags := c.seek(name)
// Return nil if the key doesn't exist or it is not a bucket.
if !bytes.Equal(name, k) || (flags&bucketLeafFlag) == 0 {
return nil
}
// Otherwise create a bucket and cache it.
// 根据找到的value来打开桶。
var child = b.openBucket(v)
// 加速缓存的作用
if b.buckets != nil {
b.buckets[string(name)] = child
}
return child
}
// Helper method that re-interprets a sub-bucket value
// from a parent into a Bucket
func (b *Bucket) openBucket(value []byte) *Bucket {
var child = newBucket(b.tx)
// If unaligned load/stores are broken on this arch and value is
// unaligned simply clone to an aligned byte array.
unaligned := brokenUnaligned && uintptr(unsafe.Pointer(&value[0]))&3 != 0
if unaligned {
value = cloneBytes(value)
}
// If this is a writable transaction then we need to copy the bucket entry.
// Read-only transactions can point directly at the mmap entry.
if b.tx.writable && !unaligned {
child.bucket = &bucket{}
*child.bucket = *(*bucket)(unsafe.Pointer(&value[0]))
} else {
child.bucket = (*bucket)(unsafe.Pointer(&value[0]))
}
// Save a reference to the inline page if the bucket is inline.
// 内联桶
if child.root == 0 {
child.page = (*page)(unsafe.Pointer(&value[bucketHeaderSize]))
}
return &child
}

key/value的插入、获取、删除

其实本质上,对key/value的所有操作最终都要表现在底层的node上。因为node节点就是用来存储真实数据的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
func (b *Bucket) Get(key []byte) []byte {
k, v, flags := b.Cursor().seek(key)

// Return nil if this is a bucket.
if (flags & bucketLeafFlag) != 0 {
return nil
}

// If our target node isn't the same key as what's passed in then return nil.
if !bytes.Equal(key, k) {
return nil
}
return v
}

func (b *Bucket) Put(key []byte, value []byte) error {
// Move cursor to correct position.
c := b.Cursor()
k, _, flags := c.seek(key)

// Return an error if there is an existing key with a bucket value.
if bytes.Equal(key, k) && (flags&bucketLeafFlag) != 0 {
return ErrIncompatibleValue
}

// Insert into node.
key = cloneBytes(key)
c.node().put(key, key, value, 0, 0)

return nil
}

func (b *Bucket) Delete(key []byte) error {
// Move cursor to correct position.
c := b.Cursor()
k, _, flags := c.seek(key)

// Return nil if the key doesn't exist.
if !bytes.Equal(key, k) {
return nil
}

// Return an error if there is already existing bucket value.
if (flags & bucketLeafFlag) != 0 {
return ErrIncompatibleValue
}

// Delete the node if we have a matching key.
c.node().del(key)

return nil
}

Bucket的页分裂、页合并

1
2
3
4
5
6
7
8
9
10
11
12
// spill writes all the nodes for this bucket to dirty pages.
func (b *Bucket) spill() error {}

func (b *Bucket) rebalance() {
for _, n := range b.nodes {
n.rebalance()
}
for _, child := range b.buckets {
child.rebalance()
}
}