Golang

关注公众号 jb51net

关闭
首页 > 脚本专栏 > Golang > Golang Redis持久化AOF

探索Golang实现Redis持久化AOF实例

作者:绍纳 nullbody笔记

这篇文章主要为大家介绍了Golang实现Redis持久化AOF实例探索,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

引言

用11篇文章实现一个可用的Redis服务,姑且叫EasyRedis吧,希望通过文章将Redis掰开撕碎了呈现给大家,而不是仅仅停留在八股文的层面,并且有非常爽的感觉,欢迎持续关注学习。

[x] easyredis之TCP服务

[x] easyredis之网络请求序列化协议(RESP)

[x] easyredis之内存数据库

[x] easyredis之过期时间 (时间轮实现)

[x] easyredis之持久化 (AOF实现)

[ ] easyredis之发布订阅功能

[ ] easyredis之有序集合(跳表实现)

[ ] easyredis之 pipeline 客户端实现

[ ] easyredis之事务(原子性/回滚)

[ ] easyredis之连接池

[ ] easyredis之分布式集群存储

【第五篇】EasyRedis之持久化AOF

AOF全称Append Only File,就是将写相关的命令,追加保存到文件中,当服务器重启以后,将文件中的命令在服务端重放(重新执行恢复数据),实现的一种持久化方式。

本篇通过3个部分讲解AOF的实现:

AOF的写入过程

在核心的数据结构 Engine中新增一个 aof *AOF对象

// 存储引擎,负责数据的CRUD
type Engine struct {
	// *DB
	dbSet []*atomic.Value
	// 时间轮(延迟任务)
	delay *timewheel.Delay
	// Append Only File
	aof *aof.AOF
}

在初始化函数func NewEngine() *Engine中,会基于是否启用AOF日志,决定 aof *aof.AOF的初始化

func NewEngine() *Engine {
	//.....省略....
	// 启用AOF日志
	if conf.GlobalConfig.AppendOnly {
		// 创建*AOF对象
		aof, err := aof.NewAOF(conf.GlobalConfig.AppendFilename, engine, true, conf.GlobalConfig.AppendFsync)
		if err != nil {
			panic(err)
		}
		engine.aof = aof
		// 设定每个db,使用aof写入日志
		engine.aofBindEveryDB()
	}
	return engine
}

因为实际执行redis命令的对象是 *DB,所以会对每个*DB对象设定db.writeAof函数指针

func (e *Engine) aofBindEveryDB() {
	for _, dbSet := range e.dbSet {
		db := dbSet.Load().(*DB)
		db.writeAof = func(redisCommand [][]byte) {
			if conf.GlobalConfig.AppendOnly {
                // 调用e.aof对象方法,保存命令
				e.aof.SaveRedisCommand(db.index, aof.Command(redisCommand))
			}
		}
	}
}

例如,当我们执行 set key value命令的时候,实际会执行 func cmdSet(db *DB, args [][]byte) protocol.Reply

func cmdSet(db *DB, args [][]byte) protocol.Reply {
    //.....省略....
	if result > 0 { // 1 表示存储成功
		//TODO: 过期时间处理
		if ttl != nolimitedTTL { // 设定key过期
			expireTime := time.Now().Add(time.Duration(ttl) * time.Millisecond)
			db.ExpireAt(key, expireTime)
			//写入日志
			db.writeAof(aof.SetCmd([][]byte{args[0], args[1]}...))
			db.writeAof(aof.PExpireAtCmd(string(args[0]), expireTime))
		} else { // 设定key不过期
			db.Persist(key)
			//写入日志
			db.writeAof(aof.SetCmd(args...))
		}
		return protocol.NewOkReply()
	}
	return protocol.NewNullBulkReply()
}

可以看到,会调用上面刚才设定的db.writeAof函数,将当前的命令保存到AOF中。所以我们实际看下 SaveRedisCommand函数中具体在做什么事情。代码路径位于aof/aof.go

func (aof *AOF) SaveRedisCommand(dbIndex int, command Command) {
	// 关闭
	if aof.atomicClose.Load() {
		return
	}
	// 写入文件 & 刷盘
	if aof.aofFsync == FsyncAlways {
		record := aofRecord{
			dbIndex: dbIndex,
			command: command,
		}
		aof.writeAofRecord(record)
		return
	}
	// 写入缓冲
	aof.aofChan <- aofRecord{
		dbIndex: dbIndex,
		command: command,
	}
}

因为AOF的刷盘(Sync)有三种模式:

如果配置的是always模式,会立即执行aof.writeAofRecord(record);否则就将数据先保存在缓冲aof.aofChan中(这里其实又是生产者消费者模型)最后在消费协程中,执行写入

func (aof *AOF) watchChan() {
	for record := range aof.aofChan {
		aof.writeAofRecord(record)
	}
	aof.aofFinished <- struct{}{}
}

所以我们只需要看下 writeAofRecord函数即可,其实就是把命令按照Redis 序列化协议的格式,写入到文件中。给大家看下更直观的演示图:

再看下在 append.aof文件中具体的数据格式:

这里有个很重要点:因为AOF文件是所有的*DB对象复用的文件,写入的redis命令归属于不同的数据库的

举个例子: 比如在0号数据库,我们执行set key value,在3号数据库,我们执行set key value,在日志文件中会记录两条命令,但是这两个命令其实是不同数据库的命令。在恢复命令到数据库的时候,应该在不同的数据库中执行该命令。所以在记录命令的时候,我们还要记录下他的数据库是什么?这样恢复的时候,才能知道命令的数据库的归属问题。

func (aof *AOF) writeAofRecord(record aofRecord) {
	aof.mu.Lock()
	defer aof.mu.Unlock()
	// 因为aof对象是所有数据库对象【复用】写入文件方法,每个数据库的索引不同
	// 所以,每个命令的执行,有个前提就是操作的不同的数据库
	if record.dbIndex != aof.lastDBIndex {
		// 构建select index 命令 & 写入文件
		selectCommand := [][]byte{[]byte("select"), []byte(strconv.Itoa(record.dbIndex))}
		data := protocol.NewMultiBulkReply(selectCommand).ToBytes()
		_, err := aof.aofFile.Write(data)
		if err != nil {
			logger.Warn(err)
			return
		}
		aof.lastDBIndex = record.dbIndex
	}
	// redis命令
	data := protocol.NewMultiBulkReply(record.command).ToBytes()
	_, err := aof.aofFile.Write(data)
	if err != nil {
		logger.Warn(err)
	}
	logger.Debugf("write aof command:%q", data)
	// 每次写入刷盘
	if aof.aofFsync == FsyncAlways {
		aof.aofFile.Sync()
	}
}

AOF的加载过程

在服务启动的时候,将*.aof文件中的命令,在服务端进行重放,效果演示如下:

代码路径位于aof/aof.go

// 构建AOF对象
func NewAOF(aofFileName string, engine abstract.Engine, load bool, fsync string) (*AOF, error) {
	//...省略...
	// 启动加载aof文件
	if load {
		aof.LoadAof(0)
	}
    //...省略...
}

aof.LoadAof(0)函数的本质就是从文件中,按照行读取数据。如果看过之前的文章,这里其实复用了parser.ParseStream(reader)函数,负责从文件解析redis序列化协议格式的命令,最后利用数据库引擎,将命令数据保存到内存中(命令重放)

func (aof *AOF) LoadAof(maxBytes int) {
	// 目的:当加载aof文件的时候,因为需要复用engine对象,内部重放命令的时候会自动写aof日志,加载aof 禁用 SaveRedisCommand的写入
	aof.atomicClose.Store(true)
	deferfunc() {
		aof.atomicClose.Store(false)
	}()
	// 只读打开文件
	file, err := os.Open(aof.aofFileName)
	if err != nil {
		logger.Error(err.Error())
		return
	}
	defer file.Close()
	file.Seek(0, io.SeekStart)
	var reader io.Reader
	if maxBytes > 0 { // 限定读取的字节大小
		reader = io.LimitReader(file, int64(maxBytes))
	} else { // 不限定,直接读取到文件结尾(为止)
		reader = file
	}
	// 文件中保存的格式和网络传输的格式一致
	ch := parser.ParseStream(reader)
	virtualConn := connection.NewVirtualConn()
	for payload := range ch {
		if payload.Err != nil {
			// 文件已经读取到“完成“
			if payload.Err == io.EOF {
				break
			}
			// 读取到非法的格式
			logger.Errorf("LoadAof parser error %+v:", payload.Err)
			continue
		}
		if payload.Reply == nil {
			logger.Error("empty payload data")
			continue
		}
		// 从文件中读取到命令
		reply, ok := payload.Reply.(*protocol.MultiBulkReply)
		if !ok {
			logger.Error("require multi bulk protocol")
			continue
		}
		// 利用数据库引擎,将命令数据保存到内存中(命令重放)
		ret := aof.engine.Exec(virtualConn, reply.RedisCommand)
		// 判断是否执行失败
		if protocol.IsErrReply(ret) {
			logger.Error("exec err ", string(ret.ToBytes()))
		}
		// 判断命令是否是"select"
		if strings.ToLower(string(reply.RedisCommand[0])) == "select" {
			dbIndex, err := strconv.Atoi(string(reply.RedisCommand[1]))
			if err == nil {
				aof.lastDBIndex = dbIndex // 记录下数据恢复过程中,选中的数据库索引
			}
		}
	}
}

AOF的重写过程

代码路径aof/rewrite.go重写的过程就是下面的函数

func (aof *AOF) Rewrite(engine abstract.Engine) {
	//1.对现有的aof文件做一次快照
	snapShot, err := aof.startRewrite()
	if err != nil {
		logger.Errorf("StartRewrite err: %+v", err)
		return
	}
	//2. 将现在的aof文件数据,加在到新(内存)对象中,并重写入新aof文件中
	err = aof.doRewrite(snapShot, engine)
	if err != nil {
		logger.Errorf("doRewrite err: %+v", err)
		return
	}
	//3. 将重写过程中的增量命令写入到新文件中
	err = aof.finishRewrite(snapShot)
	if err != nil {
		logger.Errorf("finishRewrite err: %+v", err)
	}
}

整个的处理思想很重要:如下图

总结

代码的思路应该还是比较清晰,但是细节上的处理非常容易让人大脑宕机。建议还是看下源码,边看边自己敲一下,感受是不一样

项目代码地址: https://github.com/gofish2020/easyredis 

以上就是探索Golang实现Redis持久化AOF实例的详细内容,更多关于Golang Redis持久化AOF的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文