Redis内存数据库示例分析
作者:上后左爱
Redis本身的内容比较复杂。如果你上来,你应该研究一个细节点,比如连接池和数据结构。虽然可以直接了解某一点的详细来源内容,甚至尽快解决一些意外,但是容易淹没在失眠的细节中,整体控制不了Redis
redies dict字典
这是 Redis 最底层的结构,比如 1个DB 下面有 16个Dict
1. 使用接口方式, 基础实现是simple_dict ,sync_dict ,后续用户可以根据自己的需求进行自定义的实现属于自己的Dict , 在 forEach 方法 支持匿名函数方式 type Dict interface { Get(key string) (val interface {}, exists bool) Len() // 回复上次 put 放入 Put(key string, val interface{}) (result int) PutIfAbsent(key string, val interface {}) (result int) PutIfExists(key string, val interface{}) (result int) Remove(key string) (result int) ForEach(consumer Consumer) // 传入是方法, 返回 bool = true j继续遍历 Keys() []string RandomKeys(limit int) []string // 返回多个不重复的键 RandomDistinctKeys(limit int) []string Clear() } // Consumer is used to traversal dict, if it returns false the traversal will be break type Consumer func(key string, val interface{}) bool
package dict // SimpleDict wraps a map, it is not thread safe type SimpleDict struct { m map[string]interface{} } // MakeSimple makes a new map func MakeSimple() *SimpleDict { return &SimpleDict{ m: make(map[string]interface{}), } } // Get returns the binding value and whether the key is exist func (dict *SimpleDict) Get(key string) (val interface{}, exists bool) { val, ok := dict.m[key] return val, ok } // Len returns the number of dict func (dict *SimpleDict) Len() int { if dict.m == nil { panic("m is nil") } return len(dict.m) } // Put puts key value into dict and returns the number of new inserted key-value func (dict *SimpleDict) Put(key string, val interface{}) (result int) { _, existed := dict.m[key] dict.m[key] = val if existed { return 0 } return 1 } // PutIfAbsent puts value if the key is not exists and returns the number of updated key-value func (dict *SimpleDict) PutIfAbsent(key string, val interface{}) (result int) { _, existed := dict.m[key] if existed { return 0 } dict.m[key] = val return 1 } // PutIfExists puts value if the key is exist and returns the number of inserted key-value func (dict *SimpleDict) PutIfExists(key string, val interface{}) (result int) { _, existed := dict.m[key] if existed { dict.m[key] = val return 1 } return 0 } // Remove removes the key and return the number of deleted key-value func (dict *SimpleDict) Remove(key string) (result int) { _, existed := dict.m[key] delete(dict.m, key) if existed { return 1 } return 0 } // Keys returns all keys in dict func (dict *SimpleDict) Keys() []string { result := make([]string, len(dict.m)) i := 0 for k := range dict.m { result[i] = k } return result } // ForEach traversal the dict func (dict *SimpleDict) ForEach(consumer Consumer) { for k, v := range dict.m { if !consumer(k, v) { break } } } // RandomKeys randomly returns keys of the given number, may contain duplicated key func (dict *SimpleDict) RandomKeys(limit int) []string { result := make([]string, limit) for i := 0; i < limit; i++ { for k := range dict.m { result[i] = k break } } return result } // RandomDistinctKeys randomly returns keys of the given number, won't contain duplicated key func (dict *SimpleDict) RandomDistinctKeys(limit int) []string { size := limit if size > len(dict.m) { size = len(dict.m) } result := make([]string, size) i := 0 for k := range dict.m { if i == limit { break } result[i] = k i++ } return result } // Clear removes all keys in dict func (dict *SimpleDict) Clear() { *dict = *MakeSimple() }
Redis的DB实现
针对其中 Command 的实现, ping, set ,Get 有具体的实现方法, 采用策略模式形式, 不同的方法 有自己的 Executor 执行器
// 指令与结构体之间的关系 var cmdTable = make(map[string]*command) type command struct { executor ExecFunc arity int // allow number of args, arity < 0 means len(args) >= -arity } // RegisterCommand registers a new command // arity means allowed number of cmdArgs, arity < 0 means len(args) >= -arity. // for example: the arity of `get` is 2, `mget` is -2 func RegisterCommand(name string, executor ExecFunc, arity int) { name = strings.ToLower(name) cmdTable[name] = &command{ executor: executor, arity: arity, } }
// Package database is a memory database with redis compatible interface package database import ( "go-redis/datastruct/dict" "go-redis/interface/database" "go-redis/interface/resp" "go-redis/resp/reply" "strings" ) // DB stores data and execute user's commands type DB struct { index int // key -> DataEntity data dict.Dict // 底层的实现可以进行切换 } // ExecFunc is interface for command executor, redis 的 指令实现, 所有的实现 协程这种形式 // args don't include cmd line type ExecFunc func(db *DB, args [][]byte) resp.Reply // CmdLine is alias for [][]byte, represents a command line type CmdLine = [][]byte // makeDB create DB instance func makeDB() *DB { db := &DB{ data: dict.MakeSyncDict(), // 使用的 sync 的 dict } return db } // Exec executes command within one database func (db *DB) Exec(c resp.Connection, cmdLine [][]byte) resp.Reply { cmdName := strings.ToLower(string(cmdLine[0])) cmd, ok := cmdTable[cmdName] if !ok { return reply.MakeErrReply("ERR unknown command '" + cmdName + "'") } if !validateArity(cmd.arity, cmdLine) { return reply.MakeArgNumErrReply(cmdName) } fun := cmd.executor return fun(db, cmdLine[1:]) } func validateArity(arity int, cmdArgs [][]byte) bool { argNum := len(cmdArgs) if arity >= 0 { return argNum == arity } return argNum >= -arity } /* ---- data Access ----- */ // GetEntity returns DataEntity bind to given key func (db *DB) GetEntity(key string) (*database.DataEntity, bool) { raw, ok := db.data.Get(key) if !ok { return nil, false } entity, _ := raw.(*database.DataEntity) return entity, true } // PutEntity a DataEntity into DB func (db *DB) PutEntity(key string, entity *database.DataEntity) int { return db.data.Put(key, entity) } // PutIfExists edit an existing DataEntity func (db *DB) PutIfExists(key string, entity *database.DataEntity) int { return db.data.PutIfExists(key, entity) } // PutIfAbsent insert an DataEntity only if the key not exists func (db *DB) PutIfAbsent(key string, entity *database.DataEntity) int { return db.data.PutIfAbsent(key, entity) } // Remove the given key from db func (db *DB) Remove(key string) { db.data.Remove(key) } // Removes the given keys from db func (db *DB) Removes(keys ...string) (deleted int) { deleted = 0 for _, key := range keys { _, exists := db.data.Get(key) if exists { db.Remove(key) deleted++ } } return deleted } // Flush clean database func (db *DB) Flush() { db.data.Clear() }
具体的实现器
// Ping Executor func Ping(db *DB, args [][]byte) resp.Reply { if len(args) == 0 { return &reply.PongReply{} } else if len(args) == 1 { return reply.MakeStatusReply(string(args[0])) } else { return reply.MakeErrReply("ERR wrong number of arguments for 'ping' command") } } // 初始化 方法 func init() { RegisterCommand("ping", Ping, -1) }
Redis持久化Aof
AOF 则以协议文本的方式,将所有对数据库进行过写入的命令(及其参数)记录到 AOF 文件,以此达到记录数据库状态的目的.
package aof import ( "go-redis/config" databaseface "go-redis/interface/database" "go-redis/lib/logger" "go-redis/lib/utils" "go-redis/resp/connection" "go-redis/resp/parser" "go-redis/resp/reply" "io" "os" "strconv" ) // CmdLine is alias for [][]byte, represents a command line type CmdLine = [][]byte const ( aofQueueSize = 1 << 16 ) type payload struct { cmdLine CmdLine dbIndex int } // AofHandler receive msgs from channel and write to AOF file type AofHandler struct { db databaseface.Database aofChan chan *payload aofFile *os.File aofFilename string currentDB int } // NewAOFHandler creates a new aof.AofHandler func NewAOFHandler(db databaseface.Database) (*AofHandler, error) { handler := &AofHandler{} handler.aofFilename = config.Properties.AppendFilename handler.db = db // init Load Aof to Memory handler.LoadAof() aofFile, err := os.OpenFile(handler.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600) if err != nil { return nil, err } handler.aofFile = aofFile handler.aofChan = make(chan *payload, aofQueueSize) go func() { handler.handleAof() }() return handler, nil } func (handler *AofHandler) AddAof(dbIndex int, cmdLine CmdLine) { if config.Properties.AppendOnly && handler.aofChan != nil { handler.aofChan <- &payload{ cmdLine: cmdLine, dbIndex: dbIndex, } } } // LoadAof read aof file 数据回放的功能 func (handler *AofHandler) LoadAof() { file, err := os.Open(handler.aofFilename) if err != nil { logger.Warn(err) return } defer func(file *os.File) { err := file.Close() if err != nil { } }(file) ch := parser.ParseStream(file) fakeConn := &connection.Connection{} // only used for save dbIndex // LoadAof recover data for p := range ch { if p.Err != nil { if p.Err == io.EOF { break } logger.Error("parse error: " + p.Err.Error()) continue } if p.Data == nil { logger.Error("empty payload") continue } r, ok := p.Data.(*reply.MultiBulkReply) if !ok { logger.Error("require multi bulk reply") continue } ret := handler.db.Exec(fakeConn, r.Args) if reply.IsErrorReply(ret) { logger.Error("exec err", err) } } } // handleAof listen aof channel and write into file aof 异步形式的落盘 func (handler *AofHandler) handleAof() { handler.currentDB = 0 for p := range handler.aofChan { if p.dbIndex != handler.currentDB { // select db data := reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))).ToBytes() _, err := handler.aofFile.Write(data) if err != nil { logger.Warn(err) continue // skip this command } handler.currentDB = p.dbIndex } data := reply.MakeMultiBulkReply(p.cmdLine).ToBytes() _, err := handler.aofFile.Write(data) if err != nil { logger.Warn(err) } } }
到此这篇关于Redis内存数据库示例分析的文章就介绍到这了,更多相关Redis内存数据库内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!