Golang实现自己的Redis(pipeline客户端)实例探索
作者:绍纳 nullbody笔记
引言
用11篇文章实现一个可用的Redis服务,姑且叫EasyRedis吧,希望通过文章将Redis掰开撕碎了呈现给大家,而不是仅仅停留在八股文的层面,并且有非常爽的感觉,欢迎持续关注学习。
- [x] easyredis之TCP服务
- [x] easyredis之网络请求序列化协议(RESP)
- [x] easyredis之内存数据库
- [x] easyredis之过期时间 (时间轮实现)
- [x] easyredis之持久化 (AOF实现)
- [x] easyredis之发布订阅功能
- [x] easyredis之有序集合(跳表实现)
- [x] easyredis之 pipeline 客户端实现
- [ ] easyredis之事务(原子性/回滚)
- [ ] easyredis之连接池
- [ ] easyredis之分布式集群存储
EasyRedis之pipeline客户端
网络编程的一个基础知识:用同一个sokcet连接发送多个数据包的时候,我们一般的做法是,发送并立刻接收结果,在没有接收到,是不会继续发送数据包。这种方法简单,但是效率太低。时间都浪费在等待上了...
socket的【发送缓冲区和接收缓冲区】是分离的,也就是发送不用等待接收,接收也不用等待发送。
所以我们可以把我们要发送的多个数据包【数据包1/数据包2...数据包N】复用同一个连接,通过发送缓冲区按顺序都发送给服务端。服务端处理请求的顺序,也是按照【数据包1/数据包2...数据包N】这个顺序处理的。当处理完以后,处理结果将按照【数据包结果1/数据包结果2...数据包结果N】顺序发送给客户端的接收缓冲区。客户端只需要从接收缓冲区中读取数据,并保存到请求数据包上,即可。这样我们就可以将发送和接收分离开来。一个协程只负责发送,一个协程只负责接收,互相不用等待。关键在于保证发送和接收的顺序是相同的设计逻辑图如下:
代码路径redis/client/client.go
整个代码也就是200多行,结合上图非常容易理解
创建客户端
type RedisClent struct { // socket连接 conn net.Conn addr string // 客户端当前状态 connStatus atomic.Int32 // heartbeat ticker time.Ticker // buffer cache waitSend chan *request waitResult chan *request // 有请求正在处理中... working sync.WaitGroup } // 创建redis客户端socket func NewRedisClient(addr string) (*RedisClent, error) { conn, err := net.Dial("tcp", addr) if err != nil { returnnil, err } rc := RedisClent{} rc.conn = conn rc.waitSend = make(chan *request, maxChanSize) rc.waitResult = make(chan *request, maxChanSize) rc.addr = addr return &rc, nil } // 启动 func (rc *RedisClent) Start() error { rc.ticker = *time.NewTicker(heartBeatInterval) // 将waitSend缓冲区进行发送 go rc.execSend() // 获取服务端结果 go rc.execReceive() // 定时发送心跳 go rc.execHeardBeat() rc.connStatus.Store(connRunning) // 启动状态 returnnil }
发送Redis命令
将command [][]byte
保存到缓冲区 rc.waitSend
中
// 将redis命令保存到 waitSend 中 func (rc *RedisClent) Send(command [][]byte) (protocol.Reply, error) { // 已关闭 if rc.connStatus.Load() == connClosed { returnnil, errors.New("client closed") } req := &request{ command: command, wait: wait.Wait{}, } // 单个请求 req.wait.Add(1) // 所有请求 rc.working.Add(1) defer rc.working.Done() // 将数据保存到缓冲中 rc.waitSend <- req // 等待处理结束 if req.wait.WaitWithTimeOut(maxWait) { returnnil, errors.New("time out") } // 出错 if req.err != nil { err := req.err returnnil, err } // 正常 return req.reply, nil }
发送Redis命令到服务端
// 将waitSend缓冲区进行发送 func (rc *RedisClent) execSend() { for req := range rc.waitSend { rc.sendReq(req) } } func (rc *RedisClent) sendReq(req *request) { // 无效请求 if req == nil || len(req.command) == 0 { return } var err error // 网络请求(重试3次) for i := 0; i < 3; i++ { _, err = rc.conn.Write(req.Bytes()) // 发送成功 or 发送错误(除了超时错误和deadline错误)跳出 if err == nil || (!strings.Contains(err.Error(), "timeout") && // only retry timeout !strings.Contains(err.Error(), "deadline exceeded")) { break } } if err == nil { // 发送成功,异步等待结果 rc.waitResult <- req } else { // 发送失败,请求直接失败 req.err = err req.wait.Done() } }
从服务端读取数据
func (rc *RedisClent) execReceive() { ch := parser.ParseStream(rc.conn) for payload := range ch { if payload.Err != nil { if rc.connStatus.Load() == connClosed { // 连接已关闭 return } // 否则,重新连接(可能因为网络抖动临时断开了) rc.reconnect() return } // 说明一切正常 rc.handleResult(payload.Reply) } } func (rc *RedisClent) handleResult(reply protocol.Reply) { // 从rc.waitResult 获取一个等待中的请求,将结果保存进去 req := <-rc.waitResult if req == nil { return } req.reply = reply req.wait.Done() // 通知已经获取到结果 }
断线重连
因为网络抖动可能存在连接断开的情况,所以需要有重连的功能
func (rc *RedisClent) reconnect() { logger.Info("redis client reconnect...") rc.conn.Close() var conn net.Conn // 重连(重试3次) for i := 0; i < 3; i++ { var err error conn, err = net.Dial("tcp", rc.addr) if err != nil { logger.Error("reconnect error: " + err.Error()) time.Sleep(time.Second) continue } else { break } } // 服务端连不上,说明服务可能挂了(or 网络问题 and so on...) if conn == nil { rc.Stop() return } // 这里关闭没问题,因为rc.conn.Close已经关闭,函数Send中保存的请求因为发送不成功,不会写入到waitResult close(rc.waitResult) // 清理 waitResult(因为连接重置,新连接上只能处理新请求,老的请求的数据结果在老连接上,老连接已经关了,新连接上肯定是没有结果的) for req := range rc.waitResult { req.err = errors.New("connect reset") req.wait.Done() } // 新连接(新气象) rc.waitResult = make(chan *request, maxWait) rc.conn = conn // 重新启动接收协程 go rc.execReceive() }
项目代码地址: https://github.com/gofish2020/easyredis
以上就是Golang实现自己的Redis(pipeline客户端)实例探索的详细内容,更多关于Golang Redis pipeline客户端的资料请关注脚本之家其它相关文章!