1、背景介绍
前两天,现场的同事使用开发的程序测试时,发现日志中报etcdserver: mvcc: database space exceeded
,导致 etcd 无法连接。很奇怪,我们开发的程序只用到了 etcd 做程序的主备,并没有往 etcd 中写入大量的数据,为什么会造成 etcd 空间不足呢?赶紧叫现场的同事查了下 etcd 存储数据的目录以及 etcd 的状态,看看是什么情况。
查看 etcd 状态:
./etcdctl endpoint status --write-out=table --endpoints=localhost:12380
看到这里就很奇怪了,为什么 RAFT APPLYEND INDEX
会这么大呢?这完全是不正常的。
想到程序中有主备,程序启动时,会去 etcd 中 trylock
相应的锁,获取不到时,则会定期去 trylock
,会不会是这里的备节点 定期去 trylock
导致 RAFT APPLYEND INDEX
持续增长从而导致 etcd 空间不足呢?
后面测试了一下,不启动备节点时,RAFT APPLYEND INDEX
是不会增大的。那么问题的原因找到了,问题也就比较好解决。
虽然 etcd 提供了 compact 的能力,但是对于我们这个现象,是治标不治本的,所以最好还是从源头解决问题比较好。当然也可以使用 compact 来压缩 etcd 的 历史数据,但是需要注意的是 compact 时,etcd 的性能是会收到影响的。
2、场景复现
etcd client 版本
go.etcd.io/etcd/client/v3 v3.5.5
etcd server 版本
etcd-v3.5.8-linux-amd64
模拟代码如下:
package main import ( "context" "fmt" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" "time" ) var TTL = 5 var lockName = "/TEST/LOCKER" func main() { config := clientv3.Config{ Endpoints: []string{"192.168.91.66:12379"}, DialTimeout: 5 * time.Second, } // 建立连接 client, err := clientv3.New(config) if err != nil { fmt.Println(err) return } session, err := concurrency.NewSession(client, concurrency.WithTTL(TTL)) if err != nil { fmt.Println("concurrency.NewSession failed, err:", err) return } gMutex := concurrency.NewMutex(session, lockName) ctx, _ := context.WithCancel(context.Background()) if err = gMutex.TryLock(ctx); err == nil { fmt.Println("gMutex.TryLock success") } else { if err = watchLock(gMutex, ctx); err != nil { fmt.Println("get etcd global key failed") return } } // 启动成功,做具体的业务逻辑处理 fmt.Println("todo ..............") select {} } func watchLock(gMutex *concurrency.Mutex, ctx context.Context) (err error) { ticker := time.NewTicker(time.Second * time.Duration(TTL)) for { if err = gMutex.TryLock(ctx); err == nil { // 获取到锁 return nil } select { case <-ctx.Done(): return ctx.Err() case <-ticker.C: continue } } }
将上述代码编译成可执行文件 main.exe、main1.exe 后,先后执行上面两个可执行文件,然后通过下面的命令查看 etcd 中的 RAFT APPLYEND INDEX
,会发现,RAFT APPLYEND INDEX
每隔五秒钟就会增长,长时间运行就会出现 etcdserver: mvcc: database space exceeded
。
3、如何解决
上面我们已经复现了RAFT APPLYEND INDEX
,其实解决起来也比较简单,主要思路就是不要在 for 循环中 使用 trylock 方法。具体代码如下:
package main import ( "context" "fmt" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" "time" ) var TTL = 5 var lockName = "/TEST/LOCKER" func main() { config := clientv3.Config{ Endpoints: []string{"192.168.91.66:12379"}, DialTimeout: 5 * time.Second, } // 建立连接 client, err := clientv3.New(config) if err != nil { fmt.Println(err) return } session, err := concurrency.NewSession(client, concurrency.WithTTL(TTL)) if err != nil { fmt.Println("concurrency.NewSession failed, err:", err) return } gMutex := concurrency.NewMutex(session, lockName) ctx, _ := context.WithCancel(context.Background()) if err = gMutex.TryLock(ctx); err == nil { fmt.Println("gMutex.TryLock success") } else { if err = watchLock(client, gMutex, ctx); err != nil { fmt.Println("get etcd global key failed") return } } // 启动成功,做具体的业务逻辑处理 fmt.Println("todo ..............") select {} } func watchLock(client *clientv3.Client, gMutex *concurrency.Mutex, ctx context.Context) (err error) { watchCh := client.Watch(ctx, lockName, clientv3.WithPrefix()) for { select { case <-ctx.Done(): return ctx.Err() case <-watchCh: if err = gMutex.TryLock(ctx); err == nil { // 获取到锁 return nil } } } }
将上述代码编译成可执行文件 main.exe、main1.exe 后,先后执行上面两个可执行文件,然后通过下面的命令查看 etcd 中的 RAFT APPLYEND INDEX
,不会出现RAFT APPLYEND INDEX
持续增长的现象,也就是从源头解决了问题。
4、TryLock 源码分析
以下是自己的理解,如果有不对的地方,请不吝赐教,十分感谢
那下面一起看看 TryLock
方法里面做了什么操作,会导致 RAFT APPLYEND INDEX
持续增长呢。
TryLock 方法源码如下:
func (m *Mutex) TryLock(ctx context.Context) error { resp, err := m.tryAcquire(ctx) if err != nil { return err } // if no key on prefix / the minimum rev is key, already hold the lock ownerKey := resp.Responses[1].GetResponseRange().Kvs if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev { m.hdr = resp.Header return nil } client := m.s.Client() // Cannot lock, so delete the key // 这里的 client.Delete 会走到 raft 模块,从而使 etcd 的 raft applyed index 增加 1 if _, err := client.Delete(ctx, m.myKey); err != nil { return err } m.myKey = "x00" m.myRev = -1 return ErrLocked }
tryAcquire 方法源码如下:
// 下面主要是使用到了 etcd 中的事务, func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) { s := m.s client := m.s.Client() // m.myKey = /TEST/LOCKER/326989110b4e9304 m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease()) // 这里就是定义一个判断语句,创建 myKey 时的版本号是否 等于 0 cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0) // put self in lock waiters via myKey; oldest waiter holds lock // 往 etcd 中写入 myKey put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease())) // reuse key in case this session already holds the lock // 查询 myKey get := v3.OpGet(m.myKey) // fetch current holder to complete uncontended path with only one RPC getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...) // 这里是重点,判断 cmp 中的条件是否成立,成立则执行 Then 中的语句,否则执行 Else 中的语句 // 这里的语句肯定是成功的,因为我们测试的环境是执行两个不同的 session // 简单的可以理解为两个不同的程序,实际上是 两个不同的会话就会不同 // 所以我们这里的场景是 会执行 v3.OpPut 操作。所以这里会增加一次 revision // 即 etcd 的 raft applyed index 会增加 1 resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit() if err != nil { return nil, err } m.myRev = resp.Header.Revision if !resp.Succeeded { m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision } return resp, nil }
下面这张图是 debug 时,先启动一个可执行文件,然后使用 debug 方式启动的程序,程序执行完 tryAcquire 方法后,截取的一张图,这也作证了上面的分析。304 这个 key 是之前启动程序就存在的 key,下面 30f 的 key 是 debug 期间生成的 key。
大家如果有不清楚的地方,亲自去调试下,看看代码,就会明白上面说的内容了。
5、思考
其实,这并不是难以考虑到的问题,代码中出现这个问题,主要是自己对 etcd 的了解程度不够,不清楚 TryLock
的原理,以为像简单的查询Get
那样,不会导致 revision 的增长,但实际上并不是这样。而是生产中出现了问题才去看为什么会这样,然后再去解决问题,这是一种不太好的方式,希望以后在编码的时候,尽量多考虑考虑,减少问题出现。
想起来前几天看到一篇问题,也是 for 循环中的出现的问题,原文链接,感谢可以去看看 Go坑:time.After可能导致的内存泄露问题分析