boltdb
网上关于boltdb的文章有很多,特别是微信公众号上,例如:
boltdb源码分析系列-事务-腾讯云开发者社区-腾讯云 (tencent.com)
这些文章都写的挺好,但不一定覆盖了我所关注的几个点,下面我把我关注的几个点就来下来。
node page bucket tx db的关系
- 磁盘数据mmap到page内存区域,也可以理解为就是磁盘数据
- page需要一段连续的内存
- node封装的B+树节点数据结构
- bucket一个B+树数据结构。可以理解成一个表
- tx 读事务或读写事务
- bucket是内存结构每个tx中都会生成一个
- 会将tx中涉及到(读取过、修改过)的nodes都记录在bucket中
- 读写事务最终写入磁盘时是需要重新申请新的page的,即不会修改原有的page
- db整个数据库文件
- db中的freelist记录了db文件中空闲的页(即已经可以释放掉的页)
tx.commit
在boltdb的 commit中才会执行b+树的rebalance操作,执行完后再进行写入磁盘的操作。也就是说在一个事务中涉及到的多次写操作,会最终在commit的时候同意执行写入磁盘spill操作。
func (tx *Tx) Commit() error { _assert(!tx.managed, "managed tx commit not allowed") if tx.db == nil { return ErrTxClosed } else if !tx.writable { return ErrTxNotWritable } // TODO(benbjohnson): Use vectorized I/O to write out dirty pages. // Rebalance nodes which have had deletions. var startTime = time.Now() tx.root.rebalance() if tx.stats.Rebalance > 0 { tx.stats.RebalanceTime += time.Since(startTime) } // spill data onto dirty pages. startTime = time.Now() if err := tx.root.spill(); err != nil { tx.rollback() return err }
也正因为txn中可能有多个key插入,所以split就可能会进行多次
func (n *node) split(pageSize int) []*node { var nodes []*node node := n for { // Split node into two. a, b := node.splitTwo(pageSize) nodes = append(nodes, a) // If we can't split then exit the loop. if b == nil { break } // Set node to b so it gets split on the next iteration. node = b } return nodes } node.go
数据写入到磁盘的时候,是从下层节点往上层节点写的
// spill writes the nodes to dirty pages and splits nodes as it goes. // Returns an error if dirty pages cannot be allocated. func (n *node) spill() error { var tx = n.bucket.tx if n.spilled { return nil } // Spill child nodes first. Child nodes can materialize sibling nodes in // the case of split-merge so we cannot use a range loop. We have to check // the children size on every loop iteration. sort.Sort(n.children) for i := 0; i < len(n.children); i++ { if err := n.children[i].spill(); err != nil { return err } } // We no longer need the child list because it's only used for spill tracking. n.children = nil // Split nodes into appropriate sizes. The first node will always be n. var nodes = n.split(tx.db.pageSize) node.go
数据较大如何处理?直接将构造一个大的page将数据存储进去。与此同时,原先node关联的page可以释放掉了。因为整个是一个append only模式,原先的page在新事务生成,且没有其他读事务访问后就可以释放掉了。
for _, node := range nodes { // Add node's page to the freelist if it's not new. if node.pgid > 0 { tx.db.freelist.free(tx.meta.txid, tx.page(node.pgid)) node.pgid = 0 } // Allocate contiguous space for the node. p, err := tx.allocate((node.size() / tx.db.pageSize) + 1) if err != nil { return err } node.go
哪些node需要rebalance呢,size < 25% page_size或者中间节点小于2个key,叶子节点小于1个key。
func (n *node) rebalance() { if !n.unbalanced { return } n.unbalanced = false // Update statistics. n.bucket.tx.stats.Rebalance++ // Ignore if node is above threshold (25%) and has enough keys. var threshold = n.bucket.tx.db.pageSize / 4 if n.size() > threshold && len(n.inodes) > n.minKeys() { return } node.go
bucket中读到了node,就将node加入到bucket中,读到了就意味着这些node可能就会发生改变。它是在cursor移动的时候加入到bucket中的。
func (c *Cursor) node() *node { _assert(len(c.stack) > 0, "accessing a node with a zero-length cursor stack") // If the top of the stack is a leaf node then just return it. if ref := &c.stack[len(c.stack)-1]; ref.node != nil && ref.isLeaf() { return ref.node } // Start from root and traverse down the hierarchy. var n = c.stack[0].node if n == nil { n = c.bucket.node(c.stack[0].page.id, nil) } for _, ref := range c.stack[:len(c.stack)-1] { _assert(!n.isLeaf, "expected branch node") n = n.childAt(int(ref.index)) } _assert(n.isLeaf, "expected leaf node") return n }
// node creates a node from a page and associates it with a given parent. func (b *Bucket) node(pgid pgid, parent *node) *node { _assert(b.nodes != nil, "nodes map expected") // Retrieve node if it's already been created. if n := b.nodes[pgid]; n != nil { return n } // Otherwise create a node and cache it. n := &node{bucket: b, parent: parent} if parent == nil { b.rootNode = n } else { parent.children = append(parent.children, n) } // Use the inline page if this is an inline bucket. var p = b.page if p == nil { p = b.tx.page(pgid) } // Read the page into the node and cache it. n.read(p) b.nodes[pgid] = n // Update statistics. b.tx.stats.NodeCount++
freelist
它表示的是磁盘中已经释放的页
结构
- ids 所有空闲页
- pending {txid, pageids[]}即将释放的txid以及其关联的pageid
- cache map索引
->pending 释放实际
- tx.commit时会将事务中涉及到的老的node对应的page都放到pending中
- node.spill中将关联的旧node(node与page对应)放到freelist的pending中
pending->release释放时机
tx的commit阶段会将事务涉及的原先老page放到freelist的pending中。
func (f *freelist) free(txid txid, p *page) { if p.id <= 1 { panic(fmt.Sprintf("cannot free page 0 or 1: %d", p.id)) } // Free page and all its overflow pages. var ids = f.pending[txid] for id := p.id; id <= p.id+pgid(p.overflow); id++ { // Verify that page is not already free. if f.cache[id] { panic(fmt.Sprintf("page %d already freed", id)) } // Add to the freelist and cache. ids = append(ids, id) f.cache[id] = true } f.pending[txid] = ids }
db.beginRWTx 开启读写事务的时候会尝试将过期的page释放掉
func (f *freelist) release(txid txid) { m := make(pgids, 0) for tid, ids := range f.pending { if tid <= txid { // Move transaction's pending pages to the available freelist. // Don't remove from the cache since the page is still free. m = append(m, ids...) delete(f.pending, tid) } } sort.Sort(m) f.ids = pgids(f.ids).merge(m) }