// DB represents a collection of buckets persisted to a file on disk. // All data access is performed through transactions which can be obtained through the DB. // All the functions on DB will return a ErrDatabaseNotOpen if accessed before Open() is called. type DB struct { path string file *os.File // 真实存储数据的磁盘文件 lockfile *os.File // windows only dataref []byte// mmap'ed readonly, write throws SEGV // 通过mmap映射进来的地址 data *[maxMapSize]byte datasz int filesz int// current on disk file size // 元数据 meta0 *meta meta1 *meta }
// 幂等 // The maximum batch size and delay can be adjusted with DB.MaxBatchSize // and DB.MaxBatchDelay, respectively. // // Batch is only useful when there are multiple goroutines calling it. func(db *DB) Batch(fn func(*Tx)error) error { errCh := make(chanerror, 1) db.batchMu.Lock() if (db.batch == nil) || (db.batch != nil && len(db.batch.calls) >= db.MaxBatchSize) { // There is no existing batch, or the existing batch is full; start a new one. db.batch = &batch{ db: db, } // 超时控制 db.batch.timer = time.AfterFunc(db.MaxBatchDelay, db.batch.trigger) } db.batch.calls = append(db.batch.calls, call{fn: fn, err: errCh}) // 数量控制 iflen(db.batch.calls) >= db.MaxBatchSize { // wake up batch, it's ready to run go db.batch.trigger() } db.batchMu.Unlock() err := <-errCh if err == trySolo { err = db.Update(fn) } return err }
// run performs the transactions in the batch and communicates results // back to DB.Batch. func(b *batch) run() { b.db.batchMu.Lock() b.timer.Stop() // Make sure no new work is added to this batch, but don't break // other batches. if b.db.batch == b { b.db.batch = nil } b.db.batchMu.Unlock()
retry: forlen(b.calls) > 0 { var failIdx = -1 err := b.db.Update(func(tx *Tx)error { // 遍历函数函数调用 for i, c := range b.calls { if err := safelyCall(c.fn, tx); err != nil { failIdx = i return err } } returnnil })
if failIdx >= 0 { // take the failing transaction out of the batch. it's // safe to shorten b.calls here because db.batch no longer // points to us, and we hold the mutex anyway. c := b.calls[failIdx] b.calls[failIdx], b.calls = b.calls[len(b.calls)-1], b.calls[:len(b.calls)-1] // tell the submitter re-run it solo, continue with the rest of the batch c.err <- trySolo continue retry }
// pass success, or bolt internal errors, to all callers // 失败的单独重试一次 for _, c := range b.calls { c.err <- err } break retry } }
// allocate returns a contiguous block of memory starting at a given page. func(db *DB) allocate(count int) (*page, error) { // Allocate a temporary buffer for the page. var buf []byte if count == 1 { buf = db.pagePool.Get().([]byte) } else { buf = make([]byte, count*db.pageSize) } // 转成*page p := (*page)(unsafe.Pointer(&buf[0])) p.overflow = uint32(count - 1) // Use pages from the freelist if they are available. // 先从空闲列表中找 if p.id = db.freelist.allocate(count); p.id != 0 { return p, nil } // 找不到的话,就按照事务的pgid来分配 // 表示需要从文件内部扩大 // Resize mmap() if we're at the end. p.id = db.rwtx.meta.pgid // 因此需要判断是否目前所有的页数已经大于了mmap映射出来的空间 // 这儿计算的页面总数是从当前的id后还要计算count+1个 var minsz = int((p.id+pgid(count))+1) * db.pageSize if minsz >= db.datasz { if err := db.mmap(minsz); err != nil { returnnil, fmt.Errorf("mmap allocate error: %s", err) } } // Move the page id high water mark. // 如果不是从freelist中找到的空间的话,更新meta的id,也就意味着是从文件中新扩展的页 db.rwtx.meta.pgid += pgid(count) return p, nil } // grow grows the size of the database to the given sz. func(db *DB) grow(sz int) error { // Ignore if the new size is less than available file size. if sz <= db.filesz { returnnil } // 满足这个条件sz>filesz // If the data is smaller than the alloc size then only allocate what's needed. // Once it goes over the allocation size then allocate in chunks. if db.datasz < db.AllocSize { sz = db.datasz } else { sz += db.AllocSize } // Truncate and fsync to ensure file size metadata is flushed. // https://github.com/boltdb/bolt/issues/284 if !db.NoGrowSync && !db.readOnly { if runtime.GOOS != "windows" { if err := db.file.Truncate(int64(sz)); err != nil { return fmt.Errorf("file resize error: %s", err) } } if err := db.file.Sync(); err != nil { return fmt.Errorf("file sync error: %s", err) } } db.filesz = sz returnnil }
// allocate returns a contiguous block of memory starting at a given page. func(db *DB) allocate(count int) (*page, error) { // Allocate a temporary buffer for the page. var buf []byte if count == 1 { buf = db.pagePool.Get().([]byte) } else { buf = make([]byte, count*db.pageSize) } // 转成*page p := (*page)(unsafe.Pointer(&buf[0])) p.overflow = uint32(count - 1) // Use pages from the freelist if they are available. // 先从空闲列表中找 if p.id = db.freelist.allocate(count); p.id != 0 { return p, nil } // 找不到的话,就按照事务的pgid来分配 // 表示需要从文件内部扩大 // Resize mmap() if we're at the end. p.id = db.rwtx.meta.pgid // 因此需要判断是否目前所有的页数已经大于了mmap映射出来的空间 // 这儿计算的页面总数是从当前的id后还要计算count+1个 var minsz = int((p.id+pgid(count))+1) * db.pageSize if minsz >= db.datasz { if err := db.mmap(minsz); err != nil { returnnil, fmt.Errorf("mmap allocate error: %s", err) } } // Move the page id high water mark. // 如果不是从freelist中找到的空间的话,更新meta的id,也就意味着是从文件中新扩展的页 db.rwtx.meta.pgid += pgid(count) return p, nil } // grow grows the size of the database to the given sz. func(db *DB) grow(sz int) error { // Ignore if the new size is less than available file size. if sz <= db.filesz { returnnil } // 满足这个条件sz>filesz // If the data is smaller than the alloc size then only allocate what's needed. // Once it goes over the allocation size then allocate in chunks. if db.datasz < db.AllocSize { sz = db.datasz } else { sz += db.AllocSize } // Truncate and fsync to ensure file size metadata is flushed. // https://github.com/boltdb/bolt/issues/284 if !db.NoGrowSync && !db.readOnly { if runtime.GOOS != "windows" { if err := db.file.Truncate(int64(sz)); err != nil { return fmt.Errorf("file resize error: %s", err) } } if err := db.file.Sync(); err != nil { return fmt.Errorf("file sync error: %s", err) } } db.filesz = sz returnnil }