为了支持多个命令的原子性执行 Redis 提供了事务机制。 Redis 官方文档中称事务带有以下两个重要的保证:
我们在使用事务的过程中可能会遇到两类错误:
在遇到语法错误时 Redis 会中止命令入队并丢弃事务。在遇到运行时错误时 Redis 仅会报错然后继续执行事务中剩下的命令,不会像大多数数据库那样回滚事务。对此,Redis 官方的解释是:
Redis 命令只会因为错误的语法而失败(并且这些问题不能在入队时发现),或是命令用在了错误类型的键上面:这也就是说,从实用性的角度来说,失败的命令是由编程错误造成的,而这些错误应该在开发的过程中被发现,而不应该出现在生产环境中。
因为不需要对回滚进行支持,所以 Redis 的内部可以保持简单且快速。
有种观点认为 Redis 处理事务的做法会产生 bug , 然而需要注意的是, 在通常情况下, 回滚并不能解决编程错误带来的问题。 举个例子, 如果你本来想通过 INCR 命令将键的值加上 1 , 却不小心加上了 2 , 又或者对错误类型的键执行了 INCR , 回滚是没有办法处理这些情况的。鉴于没有任何机制能避免程序员自己造成的错误, 并且这类错误通常不会在生产环境中出现, 所以 Redis 选择了更简单、更快速的无回滚方式来处理事务。
emmmm, 接下来我们尝试在 Godis 中实现具有原子性、隔离性的事务吧。
事务的原子性具有两个特点:1. 事务执行过程不可被其它事务(线程)插入 2. 事务要么完全成功要么完全不执行,不存在部分成功的状态
事务的隔离性是指事务中操作的结果是否对其它并发事务可见。由于KV数据库不存在幻读问题,因此我们需要避免脏读和不可重复度问题。
与 Redis 的单线程引擎不同 godis 的存储引擎是并行的,因此需要设计锁机制来保证执行多条命令执行时的原子性和隔离性。
我们在实现内存数据库一文中提到:
实现一个常规命令需要提供3个函数:
其中的 PrepareFunc 会分析命令行返回要读写的 key, 以 prepareMSet 为例:
// return writtenKeys, readKeys func prepareMSet(args [][]byte) ([]string, []string) { size := len(args) / 2 keys := make([]string, size) for i := 0; i < size; i++ { keys[i] = string(args[2*i]) } return keys, nil }
结合实现内存数据库 中提到的 LockMap 即可完成加锁。由于其它协程无法获得相关 key 的锁所以不可能插入到事务中,所以我们实现了原子性中不可被插入的特性。
事务需要把所有 key 一次性完成加锁, 只有在事务提交或回滚时才能解锁。不能用到一个 key 就加一次锁用完就解锁,这种方法可能导致脏读:
时间 | 事务1 | 事务2 |
---|---|---|
t1 | 锁定key A | |
t2 | 修改key A | |
t3 | 解锁key A | |
t4 | 锁定key A | |
t4 | 读取key A | |
t5 | 解锁key A | |
t6 | 提交 |
如上图所示 t4 时刻, 事务 2 读到了事务 1未提交的数据,出现了脏读异常。
为了在遇到运行时错误时事务可以回滚(原子性),可用的回滚方式有两种:
Incr A
之后变为了2,我们可以再执行一次Set A 1
命令来撤销 incr 命令。出于节省内存的考虑我们最终选择了第二种方案。比如 HSet 命令只需要另一条 HSet 将 field 改回原值即可,若采用保存 value 的方法我们则需要保存整个 HashMap。类似情况的还有 LPushRPop 等命令。
有一些命令可能需要多条命令来回滚,比如回滚 Del 时不仅需要恢复对应的 key-value 还需要恢复 TTL 数据。或者 Del 命令删除了多个 key 时,也需要多条命令进行回滚。综上我们给出 UndoFunc 的定义:
// UndoFunc returns undo logs for the given command line // execute from head to tail when undo type UndoFunc func(db *DB, args [][]byte) []CmdLine
我们以可以回滚任意操作的rollbackGivenKeys
为例进行说明,当然使用rollbackGivenKeys
的成本较高,在可能的情况下尽量实现针对性的 undo log.
func rollbackGivenKeys(db *DB, keys ...string) []CmdLine { var undoCmdLines [][][]byte for _, key := range keys { entity, ok := db.GetEntity(key) if !ok { // 原来不存在 key 删掉 undoCmdLines = append(undoCmdLines, utils.ToCmdLine("DEL", key), ) } else { undoCmdLines = append(undoCmdLines, utils.ToCmdLine("DEL", key), // 先把新 key 删除掉 aof.EntityToCmd(key, entity).Args, // 把 DataEntity 序列化成命令行 toTTLCmd(db, key).Args, ) } } return undoCmdLines }
接下来看一下 EntityToCmd, 非常简单易懂:
func EntityToCmd(key string, entity *database.DataEntity) *protocol.MultiBulkReply { if entity == nil { return nil } var cmd *protocol.MultiBulkReply switch val := entity.Data.(type) { case []byte: cmd = stringToCmd(key, val) case *List.LinkedList: cmd = listToCmd(key, val) case *set.Set: cmd = setToCmd(key, val) case dict.Dict: cmd = hashToCmd(key, val) case *SortedSet.SortedSet: cmd = zSetToCmd(key, val) } return cmd } var hMSetCmd = []byte("HMSET") func hashToCmd(key string, hash dict.Dict) *protocol.MultiBulkReply { args := make([][]byte, 2+hash.Len()*2) args[0] = hMSetCmd args[1] = []byte(key) i := 0 hash.ForEach(func(field string, val interface{}) bool { bytes, _ := val.([]byte) args[2+i*2] = []byte(field) args[3+i*2] = bytes i++ return true }) return protocol.MakeMultiBulkReply(args) }
Redis Watch 命令用于监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被放弃。
实现 Watch 命令的核心是发现 key 是否被改动,我们使用简单可靠的版本号方案:为每个 key 存储一个版本号,版本号变化说明 key 被修改了:
// database/single_db.go func (db *DB) GetVersion(key string) uint32 { entity, ok := db.versionMap.Get(key) if !ok { return 0 } return entity.(uint32) } // database/transaciton.go func Watch(db *DB, conn redis.Connection, args [][]byte) redis.Reply { watching := conn.GetWatching() for _, bkey := range args { key := string(bkey) watching[key] = db.GetVersion(key) // 将当前版本号存在 conn 对象中 } return protocol.MakeOkReply() }
在执行事务前比较版本号:
// database/transaciton.go func isWatchingChanged(db *DB, watching map[string]uint32) bool { for key, ver := range watching { currentVersion := db.GetVersion(key) if ver != currentVersion { return true } } return false }
在了解事务相关机制后,我们可以来看一下事务执行的核心代码 ExecMulti
func (db *DB) ExecMulti(conn redis.Connection, watching map[string]uint32, cmdLines []CmdLine) redis.Reply { // 准备阶段 // 使用 prepareFunc 获取事务要读写的 key writeKeys := make([]string, 0) // may contains duplicate readKeys := make([]string, 0) for _, cmdLine := range cmdLines { cmdName := strings.ToLower(string(cmdLine[0])) cmd := cmdTable[cmdName] prepare := cmd.prepare write, read := prepare(cmdLine[1:]) writeKeys = append(writeKeys, write...) readKeys = append(readKeys, read...) } watchingKeys := make([]string, 0, len(watching)) for key := range watching { watchingKeys = append(watchingKeys, key) } readKeys = append(readKeys, watchingKeys...) // 将要读写的 key 和被 watch 的 key 一起加锁 db.RWLocks(writeKeys, readKeys) defer db.RWUnLocks(writeKeys, readKeys) // 检查被 watch 的 key 是否发生了改变 if isWatchingChanged(db, watching) { // watching keys changed, abort return protocol.MakeEmptyMultiBulkReply() } // 执行阶段 results := make([]redis.Reply, 0, len(cmdLines)) aborted := false undoCmdLines := make([][]CmdLine, 0, len(cmdLines)) for _, cmdLine := range cmdLines { // 在命令执行前再准备 undo log, 这样才能保证例如用 decr 回滚 incr 命令的实现可以正常工作 undoCmdLines = append(undoCmdLines, db.GetUndoLogs(cmdLine)) result := db.execWithLock(cmdLine) if protocol.IsErrorReply(result) { aborted = true // don't rollback failed commands undoCmdLines = undoCmdLines[:len(undoCmdLines)-1] break } results = append(results, result) } // 执行成功 if !aborted { db.addVersion(writeKeys...) return protocol.MakeMultiRawReply(results) } // 事务失败进行回滚 size := len(undoCmdLines) for i := size - 1; i >= 0; i-- { curCmdLines := undoCmdLines[i] if len(curCmdLines) == 0 { continue } for _, cmdLine := range curCmdLines { db.execWithLock(cmdLine) } } return protocol.MakeErrReply("EXECABORT Transaction discarded because of previous errors.") }