GO实现Redis:GO实现内存数据库(3)

  • 实现Redis的database层(核心层:处理命令并返回)
  • https://github.com/csgopher/go-redis
  • 本文涉及以下文件:
    dict:定义字典的一些方法
    sync_dict:实现dict
    db:分数据库
    command:定义指令
    ping,keys,string:指令的具体处理逻辑
    database:单机版数据库

datastruct/dict/dict.go

type Consumer func(key string, val interface{}) bool  type Dict interface {    Get(key string) (val interface{}, exists bool)    Len() int    Put(key string, val interface{}) (result int)    PutIfAbsent(key string, val interface{}) (result int)    PutIfExists(key string, val interface{}) (result int)    Remove(key string) (result int)    ForEach(consumer Consumer)    Keys() []string    RandomKeys(limit int) []string    RandomDistinctKeys(limit int) []string    Clear() } 

Dict接口:Redis数据结构的接口。这里我们使用sync.Map作为字典的实现,如果想用别的数据结构,换一个实现即可
Consumer:遍历字典所有的键值对,返回值是布尔,true继续遍历,false停止遍历

datastruct/dict/sync_dict.go

type SyncDict struct {    m sync.Map }  func MakeSyncDict() *SyncDict {    return &SyncDict{} }  func (dict *SyncDict) Get(key string) (val interface{}, exists bool) {    val, ok := dict.m.Load(key)    return val, ok }  func (dict *SyncDict) Len() int {    length := 0    dict.m.Range(func(k, v interface{}) bool {       length++       return true    })    return length }  func (dict *SyncDict) Put(key string, val interface{}) (result int) {    _, existed := dict.m.Load(key)    dict.m.Store(key, val)    if existed {       return 0    }    return 1 }  func (dict *SyncDict) PutIfAbsent(key string, val interface{}) (result int) {    _, existed := dict.m.Load(key)    if existed {       return 0    }    dict.m.Store(key, val)    return 1 }  func (dict *SyncDict) PutIfExists(key string, val interface{}) (result int) {    _, existed := dict.m.Load(key)    if existed {       dict.m.Store(key, val)       return 1    }    return 0 }  func (dict *SyncDict) Remove(key string) (result int) {    _, existed := dict.m.Load(key)    dict.m.Delete(key)    if existed {       return 1    }    return 0 }  func (dict *SyncDict) ForEach(consumer Consumer) {    dict.m.Range(func(key, value interface{}) bool {       consumer(key.(string), value)       return true    }) }  func (dict *SyncDict) Keys() []string {    result := make([]string, dict.Len())    i := 0    dict.m.Range(func(key, value interface{}) bool {       result[i] = key.(string)       i++       return true    })    return result }  func (dict *SyncDict) RandomKeys(limit int) []string {    result := make([]string, limit)    for i := 0; i < limit; i++ {       dict.m.Range(func(key, value interface{}) bool {          result[i] = key.(string)          return false       })    }    return result }  func (dict *SyncDict) RandomDistinctKeys(limit int) []string {    result := make([]string, limit)    i := 0    dict.m.Range(func(key, value interface{}) bool {       result[i] = key.(string)       i++       if i == limit {          return false       }       return true    })    return result }  func (dict *SyncDict) Clear() {    *dict = *MakeSyncDict() } 

使用sync.Map实现Dict接口

database/db.go

type DB struct { 	index int 	data  dict.Dict }  type ExecFunc func(db *DB, args [][]byte) resp.Reply  type CmdLine = [][]byte  func makeDB() *DB { 	db := &DB{ 		data: dict.MakeSyncDict(), 	} 	return db }  func (db *DB) Exec(c resp.Connection, cmdLine [][]byte) resp.Reply { 	cmdName := strings.ToLower(string(cmdLine[0])) 	cmd, ok := cmdTable[cmdName] 	if !ok { 		return reply.MakeErrReply("ERR unknown command '" + cmdName + "'") 	} 	if !validateArity(cmd.arity, cmdLine) { 		return reply.MakeArgNumErrReply(cmdName) 	} 	fun := cmd.executor 	return fun(db, cmdLine[1:]) // 把 set k v 中的set切掉 }  func validateArity(arity int, cmdArgs [][]byte) bool { 	argNum := len(cmdArgs) 	if arity >= 0 { 		return argNum == arity 	} 	return argNum >= -arity }  func (db *DB) GetEntity(key string) (*database.DataEntity, bool) { 	raw, ok := db.data.Get(key) 	if !ok { 		return nil, false 	} 	entity, _ := raw.(*database.DataEntity) 	return entity, true }  func (db *DB) PutEntity(key string, entity *database.DataEntity) int { 	return db.data.Put(key, entity) }  func (db *DB) PutIfExists(key string, entity *database.DataEntity) int { 	return db.data.PutIfExists(key, entity) }  func (db *DB) PutIfAbsent(key string, entity *database.DataEntity) int { 	return db.data.PutIfAbsent(key, entity) }  func (db *DB) Remove(key string) { 	db.data.Remove(key) }  func (db *DB) Removes(keys ...string) (deleted int) { 	deleted = 0 	for _, key := range keys { 		_, exists := db.data.Get(key) 		if exists { 			db.Remove(key) 			deleted++ 		} 	} 	return deleted }  func (db *DB) Flush() { 	db.data.Clear() } 

实现Redis中的分数据库
ExecFunc:所有Redis的指令都写成这样的类型
validateArity方法:

  • 定长:set k v => arity=3;
  • 变长:exists k1 k2 k3 ... => arity=-2,表示参数>=2个

database/command.go

var cmdTable = make(map[string]*command)  type command struct {    executor ExecFunc    arity    int  }  func RegisterCommand(name string, executor ExecFunc, arity int) {    name = strings.ToLower(name)    cmdTable[name] = &command{       executor: executor,       arity:    arity,    } } 

command:每一个command结构体都是一个指令,例如ping,keys等等
arity:参数数量
cmdTable:记录所有指令和command结构体的关系
RegisterCommand:注册指令的实现,在程序

database/ping.go

func Ping(db *DB, args [][]byte) resp.Reply {     if len(args) == 0 {         return &reply.PongReply{}     } else if len(args) == 1 {         return reply.MakeStatusReply(string(args[0]))     } else {         return reply.MakeErrReply("ERR wrong number of arguments for 'ping' command")     } }  func init() {     RegisterCommand("ping", Ping, 1) } 

init方法:在启动程序时就会调用这个方法,用于初始化

database/keys.go

func execDel(db *DB, args [][]byte) resp.Reply {    keys := make([]string, len(args))    for i, v := range args {       keys[i] = string(v)    }     deleted := db.Removes(keys...)    return reply.MakeIntReply(int64(deleted)) }  func execExists(db *DB, args [][]byte) resp.Reply {    result := int64(0)    for _, arg := range args {       key := string(arg)       _, exists := db.GetEntity(key)       if exists {          result++       }    }    return reply.MakeIntReply(result) }  func execFlushDB(db *DB, args [][]byte) resp.Reply {    db.Flush()    return &reply.OkReply{} }  func execType(db *DB, args [][]byte) resp.Reply {    key := string(args[0])    entity, exists := db.GetEntity(key)    if !exists {       return reply.MakeStatusReply("none")    }    switch entity.Data.(type) {    case []byte:       return reply.MakeStatusReply("string")    }    return &reply.UnknownErrReply{} }  func execRename(db *DB, args [][]byte) resp.Reply {    if len(args) != 2 {       return reply.MakeErrReply("ERR wrong number of arguments for 'rename' command")    }    src := string(args[0])    dest := string(args[1])        entity, ok := db.GetEntity(src)    if !ok {       return reply.MakeErrReply("no such key")    }    db.PutEntity(dest, entity)    db.Remove(src)    return &reply.OkReply{} }  func execRenameNx(db *DB, args [][]byte) resp.Reply {    src := string(args[0])    dest := string(args[1])     _, exist := db.GetEntity(dest)    if exist {       return reply.MakeIntReply(0)    }     entity, ok := db.GetEntity(src)    if !ok {       return reply.MakeErrReply("no such key")    }    db.Removes(src, dest)    db.PutEntity(dest, entity)    return reply.MakeIntReply(1) }  func execKeys(db *DB, args [][]byte) resp.Reply {    pattern := wildcard.CompilePattern(string(args[0]))    result := make([][]byte, 0)    db.data.ForEach(func(key string, val interface{}) bool {       if pattern.IsMatch(key) {          result = append(result, []byte(key))       }       return true    })    return reply.MakeMultiBulkReply(result) }  func init() {    RegisterCommand("Del", execDel, -2)    RegisterCommand("Exists", execExists, -2)    RegisterCommand("Keys", execKeys, 2)    RegisterCommand("FlushDB", execFlushDB, -1)    RegisterCommand("Type", execType, 2)    RegisterCommand("Rename", execRename, 3)    RegisterCommand("RenameNx", execRenameNx, 3) } 

keys.go实现以下指令:
execDel:del k1 k2 k3 ...
execExists:exist k1 k2 k3 ...
execFlushDB:flushdb
execType:type k1
execRename:rename k1 k2
execRenameNx:renamenx k1 k2
execKeys:keys(依赖lib包的工具类wildcard.go)

database/string.go

func execGet(db *DB, args [][]byte) resp.Reply {    key := string(args[0])    bytes, err := db.getAsString(key)    if err != nil {       return err    }    if bytes == nil {       return &reply.NullBulkReply{}    }    return reply.MakeBulkReply(bytes) }  func (db *DB) getAsString(key string) ([]byte, reply.ErrorReply) {    entity, ok := db.GetEntity(key)    if !ok {       return nil, nil    }    bytes, ok := entity.Data.([]byte)    if !ok {       return nil, &reply.WrongTypeErrReply{}    }    return bytes, nil }  func execSet(db *DB, args [][]byte) resp.Reply {    key := string(args[0])    value := args[1]    entity := &database.DataEntity{       Data: value,    }    db.PutEntity(key, entity)    return &reply.OkReply{} }  func execSetNX(db *DB, args [][]byte) resp.Reply {    key := string(args[0])    value := args[1]    entity := &database.DataEntity{       Data: value,    }    result := db.PutIfAbsent(key, entity)    return reply.MakeIntReply(int64(result)) }  func execGetSet(db *DB, args [][]byte) resp.Reply {    key := string(args[0])    value := args[1]     entity, exists := db.GetEntity(key)    db.PutEntity(key, &database.DataEntity{Data: value})    if !exists {       return reply.MakeNullBulkReply()    }    old := entity.Data.([]byte)    return reply.MakeBulkReply(old) }  func execStrLen(db *DB, args [][]byte) resp.Reply {    key := string(args[0])    entity, exists := db.GetEntity(key)    if !exists {       return reply.MakeNullBulkReply()    }    old := entity.Data.([]byte)    return reply.MakeIntReply(int64(len(old))) }  func init() {    RegisterCommand("Get", execGet, 2)    RegisterCommand("Set", execSet, -3)    RegisterCommand("SetNx", execSetNX, 3)    RegisterCommand("GetSet", execGetSet, 3)    RegisterCommand("StrLen", execStrLen, 2) } 

string.go实现以下指令:
execGet:get k1
execSet:set k v
execSetNX:setnex k v
execGetSet:getset k v 返回旧值
execStrLen:strlen k

database/database.go

type Database struct {    dbSet []*DB }  func NewDatabase() *Database {    mdb := &Database{}    if config.Properties.Databases == 0 {       config.Properties.Databases = 16    }    mdb.dbSet = make([]*DB, config.Properties.Databases)    for i := range mdb.dbSet {       singleDB := makeDB()       singleDB.index = i       mdb.dbSet[i] = singleDB    }    return mdb }  func (mdb *Database) Exec(c resp.Connection, cmdLine [][]byte) (result resp.Reply) {    defer func() {       if err := recover(); err != nil {          logger.Warn(fmt.Sprintf("error occurs: %vn%s", err, string(debug.Stack())))       }    }()     cmdName := strings.ToLower(string(cmdLine[0]))    if cmdName == "select" {       if len(cmdLine) != 2 {          return reply.MakeArgNumErrReply("select")       }       return execSelect(c, mdb, cmdLine[1:])    }    dbIndex := c.GetDBIndex()    selectedDB := mdb.dbSet[dbIndex]    return selectedDB.Exec(c, cmdLine) }  func execSelect(c resp.Connection, mdb *Database, args [][]byte) resp.Reply {    dbIndex, err := strconv.Atoi(string(args[0]))    if err != nil {       return reply.MakeErrReply("ERR invalid DB index")    }    if dbIndex >= len(mdb.dbSet) {       return reply.MakeErrReply("ERR DB index is out of range")    }    c.SelectDB(dbIndex)    return reply.MakeOkReply() }  func (mdb *Database) Close() { }  func (mdb *Database) AfterClientClose(c resp.Connection) { } 

Database:一组db的集合
Exec:执行切换db指令或者其他指令
execSelect方法:选择db(指令:select 2)

resp/handler/handler.go

import ( 	database2 "go-redis/database" )  func MakeHandler() *RespHandler {    var db database.Database    db = database2.NewDatabase()    return &RespHandler{       db: db,    } } 

修改实现协议层handler的database实现

架构小结

TCP层服务TCP的连接,然后将连接交给RESP协议层的handler,handler监听客户端的连接,将指令解析后发给管道,管道转给database层(database/database.go),核心层根据命令类型执行不同的方法,然后返回。

发表评论

相关文章