Golang实现Redis网络协议实例探究
作者:绍纳 nullbody笔记
引言
用11篇文章实现一个可用的Redis服务,姑且叫EasyRedis吧,希望通过文章将Redis掰开撕碎了呈现给大家,而不是仅仅停留在八股文的层面,并且有非常爽的感觉,欢迎持续关注学习。
[x] easyredis之网络请求序列化协议(RESP)
[ ] easyredis之内存数据库
[ ] easyredis之过期时间 (时间轮实现)
[ ] easyredis之持久化 (AOF实现)
[ ] easyredis之发布订阅功能
[ ] easyredis之有序集合(跳表实现)
[ ] easyredis之 pipeline 客户端实现
[ ] easyredis之事务(原子性/回滚)
[ ] easyredis之连接池
[ ] easyredis之分布式集群存储
EasyRedis之网络请求序列化协议(RESP)
Redis 协议格式
全名叫: Redis serialization protocol (RESP)
官网地址 :
https://redis.io/docs/reference/protocol-spec/#bulk-strings
RESP 定义了5种格式:
简单字符串(Simple String): 服务器用来返回简单的结果,比如"OK"。非二进制安全,且不允许换行。
错误信息(Error): 服务器用来返回简单的错误信息,比如"ERR Invalid Synatx"。非二进制安全,且不允许换行。
整数(Integer): llen、scard 等命令的返回值, 64位有符号整数
字符串(Bulk String): 二进制安全字符串, 比如 get 等命令的返回值
数组(Array, 又称 Multi Bulk Strings): Bulk String 数组,客户端发送指令以及 lrange 等命令响应的格式
5种格式通过第一个字符来区分:
- 简单字符串:以
+开始, 如:+OK\r\n - 错误:以
-开始,如:-ERR Invalid Synatx\r\n - 整数:以
:开始,如::1\r\n - 字符串:以
$开始,如:$3\r\nSET\r\n,3表示字符串set的字节长度为3,后面跟上实际的字符串SET。 有个特例:$-1表示 nil, 比如使用 get 命令查询一个不存在的key时,响应即为$-1 - 数组:以
*开始
比如我们在命令行中常写的命令set key value在redis-cli通过网络发送给服务器端的时候,其实发送的是*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
| *3 | 表示命令由3个部分组成(set key value) |
|---|---|
| \r\n | 分隔符 |
| $3 | 表示 set的字节长度为3 |
| \r\n | 分隔符 |
| SET | 就是set |
| \r\n | 分隔符 |
| $3 | 表示key的字节长度为3 |
| \r\n | 分隔符 |
| key | 就是key |
| \r\n | 分隔符 |
| $5 | 表示value的字节长度为5 |
| \r\n | 分隔符 |
| value | 就是表示value |
| \r\n | 分隔符 |
代码实现
通过上篇文章可以,我们需要实现一个redisHander处理连接,本质就是要用到本篇协议解析规则
func (t *TCPServer) handleConn(conn net.Conn) {
// ...代码省略...
logger.Debugf("accept new conn %s", conn.RemoteAddr().String())
// TODO :处理连接
t.redisHander.Handle(context.Background(), conn)
}
代码路径:redis/handler.go
关键函数Handle如下:代码思路就是启动一个协程parser.ParseStream(conn)负责从conn中按照\r\n为分隔符,读取数据,并保存到chan中;然后在Handle中读取 chan的数据,这里其实又使用到了生产者消费者模型
// 该方法是不同的conn复用的方法,要做的事情就是从conn中读取出符合RESP格式的数据;
// 然后针对消息格式,进行不同的业务处理
func (h *RedisHandler) Handle(ctx context.Context, conn net.Conn) {
h.activeConn.Store(conn, struct{}{})
outChan := parser.ParseStream(conn)
for payload := range outChan {
if payload.Err != nil {
// 网络conn关闭
if payload.Err == io.EOF || payload.Err == io.ErrUnexpectedEOF || strings.Contains(payload.Err.Error(), "use of closed network connection") {
h.activeConn.Delete(conn)
conn.Close()
logger.Warn("client closed:" + conn.RemoteAddr().String())
return
}
// 解析出错 protocol error
errReply := protocal.NewGenericErrReply(payload.Err.Error())
_, err := conn.Write(errReply.ToBytes())
if err != nil {
h.activeConn.Delete(conn)
conn.Close()
logger.Warn("client closed:" + conn.RemoteAddr().String() + " err info: " + err.Error())
return
}
continue
}
if payload.Reply == nil {
logger.Error("empty payload")
continue
}
reply, ok := payload.Reply.(*protocal.MultiBulkReply)
if !ok {
logger.Error("require multi bulk protocol")
continue
}
logger.Debugf("%q", string(reply.ToBytes()))
result := h.engine.Exec(conn, reply.RedisCommand)
if result != nil {
conn.Write(result.ToBytes())
} else {
conn.Write(protocal.NewUnknownErrReply().ToBytes())
}
}
}查看parser.ParseStream(conn)内部代码,可知协议解析逻辑主要在 redis/parser.go文件中的 parse函数中,代码注释很清晰。
// 从r中读取数据,将读取的结果通过 out chan 发送给外部使用(包括:正常的数据包 or 网络错误)
func parse(r io.Reader, out chan<- *Payload) {
// 异常恢复,避免未知异常
deferfunc() {
if err := recover(); err != nil {
logger.Error(err, string(debug.Stack()))
}
}()
reader := bufio.NewReader(r)
for {
// 按照 \n 分隔符读取一行数据
line, err := reader.ReadBytes('\n')
if err != nil { // 一般是 io.EOF错误(说明conn关闭or文件尾部)
out <- &Payload{Err: err}
close(out)
return
}
// 读取到的line中包括 \n 分割符
length := len(line)
// RESP协议是按照 \r\n 分割数据
if length <= 2 || line[length-2] != '\r' { // 说明是空白行,忽略
continue
}
// 去掉尾部 \r\n
line = bytes.TrimSuffix(line, []byte{'\r', '\n'})
// 协议文档 :https://redis.io/docs/reference/protocol-spec/
// The first byte in an RESP-serialized payload always identifies its type. Subsequent bytes constitute the type's contents.
switch line[0] {
case'*': // * 表示数组
err := parseArrays(line, reader, out)
if err != nil {
out <- &Payload{Err: err}
close(out)
return
}
default:
args := bytes.Split(line, []byte{' '})
out <- &Payload{
Reply: protocal.NewMultiBulkReply(args),
}
}
}
}唯一需要强调的一个点RESP协议一直强调 字符串(Bulk String): 二进制安全字符串,在代码中是如何实现的?? 从conn中读取数据,我们是按照\r\n为分隔符号获取一串字节,那如果数据本身就带有\r\n,那肯定就有问题了。 例如字符串原样输出样式为: $5\r\nva\r\nl\r\n $5 表示字符串长度为5【va\r\nl】,所以在读取到5的时候,我们不能继续按照\r\n为分隔符号读取,而是使用 io.ReadFull(reader, body)函数直接读取5个字节
// 基于数字5 读取 5+2 长度的数据,这里的2表示\r\n
body := make([]byte, dataLen+2)
// 注意:这里直接读取指定长度的字节
_, err := io.ReadFull(reader, body)
if err != nil {
return err
}
// 所以最终读取到的是 hello\r\n,去掉\r\n 保存到 lines中
lines = append(lines, body[:len(body)-2])
效果展示
用官方的redis-cli 客户端连接自己的EasyRedis服务,并发送 get easyredis 和 set easyredis 1 命令,基于 Redis序列化协议,我们可以正确的解析出命令,格式为:
*2\r\n$3\r\nget\r\n$9\r\neasyredis\r\n *3\r\n$3\r\nset\r\n$9\r\neasyredis\r\n$1\r\n1\r\n

下篇文章就是在内存数据库中完成对命令的KV存储(敬请期待)
扩展知识
在上篇文章解析conf文件用了到了 NewScanner,本篇文章将使用NewReader从网络连接中读取数据包进行解析
NewReader 和 NewScanner 介绍(来自ChatGPT):
在 Go 语言中,NewReader 和 NewScanner 分别是 bufio 包中的两个函数,用于创建不同类型的读取器。
func NewReader(rd io.Reader) *Reader
NewReader 用于创建一个新的 Reader 对象,该对象实现了 io.Reader 接口,并提供了一些额外的缓冲功能。它会使用默认的缓冲区大小(4096 字节)。
示例:
file, err := os.Open("example.txt")
if err != nil {
log.Fatal(err)
}
defer file.Close()
reader := bufio.NewReader(file)这里创建了一个从文件中读取的 bufio.Reader。
func NewScanner(r io.Reader) *Scanner
NewScanner 用于创建一个新的 Scanner 对象,该对象实现了 io.Scanner 接口,用于方便地从输入源读取数据。Scanner 对象使用默认的 bufio.Reader 进行缓冲。
示例:
file, err := os.Open("example.txt")
if err != nil {
log.Fatal(err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
这里创建了一个从文件中读取的 bufio.Scanner。
NewReader 和 NewScanner 区别:
bufio.NewReader 返回一个 bufio.Reader,该对象实现了 io.Reader 接口,提供了缓冲功能,适用于低层次的字节读取。
bufio.NewScanner 返回一个 bufio.Scanner,该对象实现了 io.Scanner 接口,提供了一些方便的方法来读取文本数据,并且它默认使用 bufio.Reader 进行缓冲,适用于高层次的文本数据读取。
选择使用哪一个取决于你的需求。如果你需要读取字节数据并且想要利用缓冲,可以使用 bufio.NewReader。 如果你要处理文本数据,并且想要方便地使用 Scanner 提供的方法,可以使用 bufio.NewScanner。
项目代码地址: https://github.com/gofish2020/easyredis
以上就是Golang实现Redis网络协议实例探究的详细内容,更多关于Golang Redis网络协议的资料请关注脚本之家其它相关文章!
