// Bucket represents a collection of key/value pairs inside the database. type Bucket struct { *bucket tx *Tx // the associated transaction buckets map[string]*Bucket // subbucket cache page *page // inline page reference rootNode *node // materialized node for the root page. nodes map[pgid]*node // node cache
// Sets the threshold for filling nodes when they split. By default, // the bucket will fill to 50% but it can be useful to increase this // amount if you know that your write workloads are mostly append-only. // // This is non-persisted across transactions so it must be set in every Tx. FillPercent float64 }
// bucket represents the on-file representation of a bucket. // This is stored as the "value" of a bucket key. If the bucket is small enough, // then its root page can be stored inline in the "value", after the bucket // header. In the case of inline buckets, the "root" will be 0. type bucket struct { root pgid // page id of the bucket's root-level page sequence uint64// monotonically incrementing, used by NextSequence() }
// Cursor creates a cursor associated with the bucket. // The cursor is only valid as long as the transaction is open. // Do not use a cursor after the transaction is closed. func(b *Bucket) Cursor() *Cursor { // Update transaction statistics. b.tx.stats.CursorCount++
// Allocate and return a cursor. return &Cursor{ bucket: b, stack: make([]elemRef, 0), } }
// elemRef represents a reference to an element on a given page/node. type elemRef struct { page *page node *node index int }
// isLeaf returns whether the ref is pointing at a leaf page/node. func(r *elemRef) isLeaf() bool { if r.node != nil { return r.node.isLeaf } return (r.page.flags & leafPageFlag) != 0 }
// count returns the number of inodes or page elements. func(r *elemRef) count() int { if r.node != nil { returnlen(r.node.inodes) } returnint(r.page.count) }
Cursor结构
1 2 3 4 5 6 7 8 9
type Cursor struct { bucket *Bucket stack []elemRef }
// Bucket returns the bucket that this cursor was created from. func(c *Cursor) Bucket() *Bucket { return c.bucket }
func(c *Cursor) First() (key []byte, value []byte)
func(c *Cursor) Last() (key []byte, value []byte)
func(c *Cursor) Next() (key []byte, value []byte)
func(c *Cursor) Prev() (key []byte, value []byte)
func(c *Cursor) Delete() error // Seek moves the cursor to a given key and returns it. // If the key does not exist then the next key is used. If no keys // follow, a nil key is returned. // The returned key and value are only valid for the life of the transaction. func(c *Cursor) Seek(seek []byte) (key []byte, value []byte)
// pageNode returns the in-memory node, if it exists. // Otherwise returns the underlying page. func(b *Bucket) pageNode(id pgid) (*page, *node) { // Inline buckets have a fake page embedded in their value so treat them // differently. We'll return the rootNode (if available) or the fake page. if b.root == 0 { if id != 0 { panic(fmt.Sprintf("inline bucket non-zero page access(2): %d != 0", id)) } if b.rootNode != nil { returnnil, b.rootNode } return b.page, nil }
// Check the node cache for non-inline buckets. if b.nodes != nil { if n := b.nodes[id]; n != nil { returnnil, n } }
// Finally lookup the page from the transaction if no node is materialized. return b.tx.page(id), nil }
// data 表示 mmap 的 file 数据 // page retrieves a page reference from the mmap based on the current page size. func(db *DB) page(id pgid) *page { pos := id * pgid(db.pageSize) return (*page)(unsafe.Pointer(&db.data[pos])) }
func(c *Cursor) Seek(seek []byte) (key []byte, value []byte) { k, v, flags := c.seek(seek) // If we ended up after the last element of a page then move to the next one. // 下面这一段逻辑是必须的,因为在seek()方法中,如果ref.index>ref.count()的话,就直接返回nil,nil,0了 // 这里需要返回下一个 if ref := &c.stack[len(c.stack)-1]; ref.index >= ref.count() { k, v, flags = c.next() } if k == nil { returnnil, nil // 子桶的话 } elseif (flags & uint32(bucketLeafFlag)) != 0 { return k, nil } return k, v }
// 实际上调用 search 方法 func(c *Cursor) seek(seek []byte) (key []byte, value []byte, flags uint32) { // Start from root page/node and traverse to correct page. c.stack = c.stack[:0] c.search(seek, c.bucket.root)
// If this is a bucket then return a nil value. return c.keyValue() }
// 整个 search 就是构建了一个搜索栈,最后用 elemRef 中的 index, 这个 index 表示node 中 inodes 的下标,或者 page 下标。 // search recursively performs a binary search against a given page/node until it finds a given key. func(c *Cursor) search(key []byte, pgid pgid) { p, n := c.bucket.pageNode(pgid) if p != nil && (p.flags&(branchPageFlag|leafPageFlag)) == 0 { panic(fmt.Sprintf("invalid page type: %d: %x", p.id, p.flags)) } e := elemRef{page: p, node: n} // 搜索栈 c.stack = append(c.stack, e)
// If we're on a leaf page/node then find the specific node. // 叶子结点就直接最后一次二分查询了 if e.isLeaf() { c.nsearch(key) return }
func(c *Cursor) searchNode(key []byte, n *node) { var exact bool index := sort.Search(len(n.inodes), func(i int)bool { // TODO(benbjohnson): Optimize this range search. It's a bit hacky right now. // sort.Search() finds the lowest index where f() != -1 but we need the highest index. ret := bytes.Compare(n.inodes[i].key, key) if ret == 0 { exact = true } return ret != -1 }) if !exact && index > 0 { index-- } c.stack[len(c.stack)-1].index = index
// Recursively search to the next page. c.search(key, n.inodes[index].pgid) }
// keyValue returns the key and value of the current leaf element. func(c *Cursor) keyValue() ([]byte, []byte, uint32) { ref := &c.stack[len(c.stack)-1]
// If the cursor is pointing to the end of page/node then return nil. if ref.count() == 0 || ref.index >= ref.count() { returnnil, nil, 0 }
// Retrieve value from node. if ref.node != nil { inode := &ref.node.inodes[ref.index] return inode.key, inode.value, inode.flags }
// Or retrieve value from page. elem := ref.page.leafPageElement(uint16(ref.index)) return elem.key(), elem.value(), elem.flags }
func(c *Cursor) First() (key []byte, value []byte) { _assert(c.bucket.tx.db != nil, "tx closed") // 清空stack c.stack = c.stack[:0] p, n := c.bucket.pageNode(c.bucket.root) // 一直找到第一个叶子节点,此处在天添加stack时,一直让index设置为0即可 ref := elemRef{page: p, node: n, index: 0} c.stack = append(c.stack, ref) c.first() // If we land on an empty page then move to the next value. // https://github.com/boltdb/bolt/issues/450 // 当前页时空的话,找下一个 if c.stack[len(c.stack)-1].count() == 0 { c.next() } k, v, flags := c.keyValue() // 是桶 if (flags & uint32(bucketLeafFlag)) != 0 { return k, nil } return k, v } // first moves the cursor to the first leaf element under the last page in the stack. // 找到最后一个非叶子节点的第一个叶子节点。index=0的节点 func(c *Cursor) first() { for { // Exit when we hit a leaf page. var ref = &c.stack[len(c.stack)-1] if ref.isLeaf() { break } // Keep adding pages pointing to the first element to the stack. var pgid pgid if ref.node != nil { pgid = ref.node.inodes[ref.index].pgid } else { pgid = ref.page.branchPageElement(uint16(ref.index)).pgid } p, n := c.bucket.pageNode(pgid) c.stack = append(c.stack, elemRef{page: p, node: n, index: 0}) } }
Next 分析
使用方式
1 2 3 4 5 6
c := b.Cursor()
// 启动遍历模式 for k, v := c.First(); k != nil; k, v = c.Next() { fmt.Printf("cursor, key=%s, value=%s\n", string(k), string(v)) }
// next 的实现就是去移动每一个 node 的 index, 从叶子结点开始,这样就能遍历完所有的叶子结点了!!! // next moves to the next leaf element and returns the key and value. // If the cursor is at the last leaf element then it stays there and returns nil. func(c *Cursor) next() (key []byte, value []byte, flags uint32) { for { // Attempt to move over one element until we're successful. // Move up the stack as we hit the end of each page in our stack. var i int // c.stack 是在调用 First 函数时被写入的,所以,这样倒序来遍历能实现按序获取 for i = len(c.stack) - 1; i >= 0; i-- { elem := &c.stack[i] if elem.index < elem.count()-1 { elem.index++ break } }
// If we've hit the root page then stop and return. This will leave the // cursor on the last element of the last page. if i == -1 { returnnil, nil, 0 }
// Otherwise start from where we left off in the stack and find the // first element of the first leaf page. c.stack = c.stack[:i+1] // 获取下一个叶子结点 c.first()
// If this is an empty page then restart and move back up the stack. // https://github.com/boltdb/bolt/issues/450 if c.stack[len(c.stack)-1].count() == 0 { continue }
/ inode represents an internal node inside of a node. // It can be used to point to elements in a page or point // to an element which hasn't been added to a page yet. type inode struct { // 表示是否是子桶叶子节点还是普通叶子节点。如果flags值为1表示子桶叶子节点,否则为普通叶子节点 flags uint32 // 当inode为分支元素时,pgid才有值,为叶子元素时,则没值 pgid pgid key []byte // 当inode为分支元素时,value为空,为叶子元素时,才有值 value []byte }
// Add capacity and shift nodes if we don't have an exact match and need to insert. exact := (len(n.inodes) > 0 && index < len(n.inodes) && bytes.Equal(n.inodes[index].key, oldKey)) if !exact { n.inodes = append(n.inodes, inode{}) copy(n.inodes[index+1:], n.inodes[index:]) }
// del removes a key from the node. func(n *node) del(key []byte) { // Find index of key. index := sort.Search(len(n.inodes), func(i int)bool { return bytes.Compare(n.inodes[i].key, key) != -1 })
// Exit if the key isn't found. if index >= len(n.inodes) || !bytes.Equal(n.inodes[index].key, key) { return }
// Delete inode from the node. n.inodes = append(n.inodes[:index], n.inodes[index+1:]...)
// Mark the node as needing rebalancing. n.unbalanced = true }
// childAt returns the child node at a given index. func(n *node) childAt(index int) *node { if n.isLeaf { panic(fmt.Sprintf("invalid childAt(%d) on a leaf node", index)) } return n.bucket.node(n.inodes[index].pgid, n) }
// childIndex returns the index of a given child node. func(n *node) childIndex(child *node) int { index := sort.Search(len(n.inodes), func(i int)bool { return bytes.Compare(n.inodes[i].key, child.key) != -1 }) return index }
// numChildren returns the number of children. func(n *node) numChildren() int { returnlen(n.inodes) }
// nextSibling returns the next node with the same parent. func(n *node) nextSibling() *node { if n.parent == nil { returnnil } // 有父节点问题就不大了 index := n.parent.childIndex(n) if index >= n.parent.numChildren()-1 { returnnil } return n.parent.childAt(index + 1) }
// prevSibling returns the previous node with the same parent. func(n *node) prevSibling() *node { if n.parent == nil { returnnil } index := n.parent.childIndex(n) if index == 0 { returnnil } return n.parent.childAt(index - 1) }
// CreateBucket creates a new bucket at the given key and returns the new bucket. // Returns an error if the key already exists, if the bucket name is blank, or if the bucket name is too long. // The bucket instance is only valid for the lifetime of the transaction. func(b *Bucket) CreateBucket(key []byte) (*Bucket, error) { if b.tx.db == nil { returnnil, ErrTxClosed } elseif !b.tx.writable { returnnil, ErrTxNotWritable } elseiflen(key) == 0 { returnnil, ErrBucketNameRequired } // Move cursor to correct position. // 拿到游标 c := b.Cursor() // 开始遍历、找到合适的位置 k, _, flags := c.seek(key) // Return an error if there is an existing key. if bytes.Equal(key, k) { // 是桶,已经存在了 if (flags & bucketLeafFlag) != 0 { returnnil, ErrBucketExists } // 不是桶、但key已经存在了 returnnil, ErrIncompatibleValue } // Create empty, inline bucket. var bucket = Bucket{ bucket: &bucket{}, rootNode: &node{isLeaf: true}, FillPercent: DefaultFillPercent, } // 拿到bucket对应的value var value = bucket.write() // Insert into node. key = cloneBytes(key) // 插入到inode中 // c.node()方法会在内存中建立这棵树,调用n.read(page) c.node().put(key, key, value, 0, bucketLeafFlag) // Since subbuckets are not allowed on inline buckets, we need to // dereference the inline page, if it exists. This will cause the bucket // to be treated as a regular, non-inline bucket for the rest of the tx. b.page = nil //根据key获取一个桶 return b.Bucket(key), nil }
/ Bucket retrieves a nested bucket by name. // Returns nil if the bucket does not exist. // The bucket instance is only valid for the lifetime of the transaction. func(b *Bucket) Bucket(name []byte) *Bucket { if b.buckets != nil { if child := b.buckets[string(name)]; child != nil { return child } } // Move cursor to key. // 根据游标找key c := b.Cursor() k, v, flags := c.seek(name) // Return nil if the key doesn't exist or it is not a bucket. if !bytes.Equal(name, k) || (flags&bucketLeafFlag) == 0 { returnnil } // Otherwise create a bucket and cache it. // 根据找到的value来打开桶。 var child = b.openBucket(v) // 加速缓存的作用 if b.buckets != nil { b.buckets[string(name)] = child } return child } // Helper method that re-interprets a sub-bucket value // from a parent into a Bucket func(b *Bucket) openBucket(value []byte) *Bucket { var child = newBucket(b.tx) // If unaligned load/stores are broken on this arch and value is // unaligned simply clone to an aligned byte array. unaligned := brokenUnaligned && uintptr(unsafe.Pointer(&value[0]))&3 != 0 if unaligned { value = cloneBytes(value) } // If this is a writable transaction then we need to copy the bucket entry. // Read-only transactions can point directly at the mmap entry. if b.tx.writable && !unaligned { child.bucket = &bucket{} *child.bucket = *(*bucket)(unsafe.Pointer(&value[0])) } else { child.bucket = (*bucket)(unsafe.Pointer(&value[0])) } // Save a reference to the inline page if the bucket is inline. // 内联桶 if child.root == 0 { child.page = (*page)(unsafe.Pointer(&value[bucketHeaderSize])) } return &child }
// Return nil if this is a bucket. if (flags & bucketLeafFlag) != 0 { returnnil }
// If our target node isn't the same key as what's passed in then return nil. if !bytes.Equal(key, k) { returnnil } return v }
func(b *Bucket) Put(key []byte, value []byte) error { // Move cursor to correct position. c := b.Cursor() k, _, flags := c.seek(key)
// Return an error if there is an existing key with a bucket value. if bytes.Equal(key, k) && (flags&bucketLeafFlag) != 0 { return ErrIncompatibleValue }