0%

bblot db

[TOC]

概述

前面我们介绍了boltdb底层在磁盘上数据时如何组织存储(page)的,然后又介绍了磁盘中的数据在内存中又是如何存储(node)的。接着我们又介绍了管理kv数据集合的Bucket对象以及用来遍历Bucket的Cursor对象。最后我们详细的介绍了boltdb中事务是如何实现(Tx)的。到此boltdb中 各个零散的部件我们都一一熟悉了,接下来是时候将他们组织在一起工作了。因而就有了boltdb中最上层的DB对象。本章主要介绍DB对象相关的方法以及其内部实现。

DB结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// DB represents a collection of buckets persisted to a file on disk.
// All data access is performed through transactions which can be obtained through the DB.
// All the functions on DB will return a ErrDatabaseNotOpen if accessed before Open() is called.
type DB struct {
path string
file *os.File // 真实存储数据的磁盘文件
lockfile *os.File // windows only
dataref []byte // mmap'ed readonly, write throws SEGV
// 通过mmap映射进来的地址
data *[maxMapSize]byte
datasz int
filesz int // current on disk file size
// 元数据
meta0 *meta
meta1 *meta
}

对外接口

1
2
// 创建数据库接口
func Open(path string, mode os.FileMode, options *Options) (*DB, error)

db.View()实现分析

View()主要用来执行只读事务。事务的开启、提交、回滚都交由tx控制。

db.Update()实现分析

Update()主要用来执行读写事务。事务的开始、提交、回滚都交由tx内部控制

db.Batch()实现分析

现在对Batch()方法稍作分析,在DB定义的那一节中我们可以看到,一个DB对象拥有一个batch对象,该对象是全局的。当我们使用Batch()方法时,内部会对将传递进去的fn缓存在calls中。

其内部也是调用了Update,只不过是在Update内部遍历之前缓存的calls。

有两种情况会触发调用Update。

  1. 第一种情况是到达了MaxBatchDelay时间,就会触发Update
  2. 第二种情况是len(db.batch.calls) >= db.MaxBatchSize,即缓存的calls个数大于等于MaxBatchSize时,也会触发Update。

Batch的本质是: 将每次写、每次刷盘的操作转变成了多次写、一次刷盘,从而提升性能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// 幂等
// The maximum batch size and delay can be adjusted with DB.MaxBatchSize
// and DB.MaxBatchDelay, respectively.
//
// Batch is only useful when there are multiple goroutines calling it.
func (db *DB) Batch(fn func(*Tx) error) error {
errCh := make(chan error, 1)
db.batchMu.Lock()
if (db.batch == nil) || (db.batch != nil && len(db.batch.calls) >= db.MaxBatchSize) {
// There is no existing batch, or the existing batch is full; start a new one.
db.batch = &batch{
db: db,
}
// 超时控制
db.batch.timer = time.AfterFunc(db.MaxBatchDelay, db.batch.trigger)
}
db.batch.calls = append(db.batch.calls, call{fn: fn, err: errCh})
// 数量控制
if len(db.batch.calls) >= db.MaxBatchSize {
// wake up batch, it's ready to run
go db.batch.trigger()
}
db.batchMu.Unlock()
err := <-errCh
if err == trySolo {
err = db.Update(fn)
}
return err
}

// run performs the transactions in the batch and communicates results
// back to DB.Batch.
func (b *batch) run() {
b.db.batchMu.Lock()
b.timer.Stop()
// Make sure no new work is added to this batch, but don't break
// other batches.
if b.db.batch == b {
b.db.batch = nil
}
b.db.batchMu.Unlock()

retry:
for len(b.calls) > 0 {
var failIdx = -1
err := b.db.Update(func(tx *Tx) error {
// 遍历函数函数调用
for i, c := range b.calls {
if err := safelyCall(c.fn, tx); err != nil {
failIdx = i
return err
}
}
return nil
})

if failIdx >= 0 {
// take the failing transaction out of the batch. it's
// safe to shorten b.calls here because db.batch no longer
// points to us, and we hold the mutex anyway.
c := b.calls[failIdx]
b.calls[failIdx], b.calls = b.calls[len(b.calls)-1], b.calls[:len(b.calls)-1]
// tell the submitter re-run it solo, continue with the rest of the batch
c.err <- trySolo
continue retry
}

// pass success, or bolt internal errors, to all callers
// 失败的单独重试一次
for _, c := range b.calls {
c.err <- err
}
break retry
}
}

db.allocate()和db.grow()分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// allocate returns a contiguous block of memory starting at a given page.
func (db *DB) allocate(count int) (*page, error) {
// Allocate a temporary buffer for the page.
var buf []byte
if count == 1 {
buf = db.pagePool.Get().([]byte)
} else {
buf = make([]byte, count*db.pageSize)
}
// 转成*page
p := (*page)(unsafe.Pointer(&buf[0]))
p.overflow = uint32(count - 1)
// Use pages from the freelist if they are available.
// 先从空闲列表中找
if p.id = db.freelist.allocate(count); p.id != 0 {
return p, nil
}
// 找不到的话,就按照事务的pgid来分配
// 表示需要从文件内部扩大
// Resize mmap() if we're at the end.
p.id = db.rwtx.meta.pgid
// 因此需要判断是否目前所有的页数已经大于了mmap映射出来的空间
// 这儿计算的页面总数是从当前的id后还要计算count+1个
var minsz = int((p.id+pgid(count))+1) * db.pageSize
if minsz >= db.datasz {
if err := db.mmap(minsz); err != nil {
return nil, fmt.Errorf("mmap allocate error: %s", err)
}
}
// Move the page id high water mark.
// 如果不是从freelist中找到的空间的话,更新meta的id,也就意味着是从文件中新扩展的页
db.rwtx.meta.pgid += pgid(count)
return p, nil
}
// grow grows the size of the database to the given sz.
func (db *DB) grow(sz int) error {
// Ignore if the new size is less than available file size.
if sz <= db.filesz {
return nil
}
// 满足这个条件sz>filesz
// If the data is smaller than the alloc size then only allocate what's needed.
// Once it goes over the allocation size then allocate in chunks.
if db.datasz < db.AllocSize {
sz = db.datasz
} else {
sz += db.AllocSize
}
// Truncate and fsync to ensure file size metadata is flushed.
// https://github.com/boltdb/bolt/issues/284
if !db.NoGrowSync && !db.readOnly {
if runtime.GOOS != "windows" {
if err := db.file.Truncate(int64(sz)); err != nil {
return fmt.Errorf("file resize error: %s", err)
}
}
if err := db.file.Sync(); err != nil {
return fmt.Errorf("file sync error: %s", err)
}
}
db.filesz = sz
return nil
}

db.allocate()和db.grow()分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// allocate returns a contiguous block of memory starting at a given page.
func (db *DB) allocate(count int) (*page, error) {
// Allocate a temporary buffer for the page.
var buf []byte
if count == 1 {
buf = db.pagePool.Get().([]byte)
} else {
buf = make([]byte, count*db.pageSize)
}
// 转成*page
p := (*page)(unsafe.Pointer(&buf[0]))
p.overflow = uint32(count - 1)
// Use pages from the freelist if they are available.
// 先从空闲列表中找
if p.id = db.freelist.allocate(count); p.id != 0 {
return p, nil
}
// 找不到的话,就按照事务的pgid来分配
// 表示需要从文件内部扩大
// Resize mmap() if we're at the end.
p.id = db.rwtx.meta.pgid
// 因此需要判断是否目前所有的页数已经大于了mmap映射出来的空间
// 这儿计算的页面总数是从当前的id后还要计算count+1个
var minsz = int((p.id+pgid(count))+1) * db.pageSize
if minsz >= db.datasz {
if err := db.mmap(minsz); err != nil {
return nil, fmt.Errorf("mmap allocate error: %s", err)
}
}
// Move the page id high water mark.
// 如果不是从freelist中找到的空间的话,更新meta的id,也就意味着是从文件中新扩展的页
db.rwtx.meta.pgid += pgid(count)
return p, nil
}
// grow grows the size of the database to the given sz.
func (db *DB) grow(sz int) error {
// Ignore if the new size is less than available file size.
if sz <= db.filesz {
return nil
}
// 满足这个条件sz>filesz
// If the data is smaller than the alloc size then only allocate what's needed.
// Once it goes over the allocation size then allocate in chunks.
if db.datasz < db.AllocSize {
sz = db.datasz
} else {
sz += db.AllocSize
}
// Truncate and fsync to ensure file size metadata is flushed.
// https://github.com/boltdb/bolt/issues/284
if !db.NoGrowSync && !db.readOnly {
if runtime.GOOS != "windows" {
if err := db.file.Truncate(int64(sz)); err != nil {
return fmt.Errorf("file resize error: %s", err)
}
}
if err := db.file.Sync(); err != nil {
return fmt.Errorf("file sync error: %s", err)
}
}
db.filesz = sz
return nil
}