关于go平滑重启库overseer实现原理详解
作者:码出钞能力
overseer主要完成了三部分功能:
1、连接的无损关闭,2、连接的平滑重启,3、文件变更的自动重启。
下面依次讲一下:
一、连接的无损关闭
golang官方的net包是不支持连接的无损关闭的,当主监听协程退出时,并不会等待各个实际work协程的处理完成。
以下是golang官方代码:
Go/src/net/http/server.go
func (srv *Server) Serve(l net.Listener) error { if fn := testHookServerServe; fn != nil { fn(srv, l) // call hook with unwrapped listener } origListener := l l = &onceCloseListener{Listener: l} defer l.Close() if err := srv.setupHTTP2_Serve(); err != nil { return err } if !srv.trackListener(&l, true) { return ErrServerClosed } defer srv.trackListener(&l, false) baseCtx := context.Background() if srv.BaseContext != nil { baseCtx = srv.BaseContext(origListener) if baseCtx == nil { panic("BaseContext returned a nil context") } } var tempDelay time.Duration // how long to sleep on accept failure ctx := context.WithValue(baseCtx, ServerContextKey, srv) for { rw, err := l.Accept() if err != nil { if srv.shuttingDown() { return ErrServerClosed } if ne, ok := err.(net.Error); ok && ne.Temporary() { if tempDelay == 0 { tempDelay = 5 * time.Millisecond } else { tempDelay *= 2 } if max := 1 * time.Second; tempDelay > max { tempDelay = max } srv.logf("http: Accept error: %v; retrying in %v", err, tempDelay) time.Sleep(tempDelay) continue } return err } connCtx := ctx if cc := srv.ConnContext; cc != nil { connCtx = cc(connCtx, rw) if connCtx == nil { panic("ConnContext returned nil") } } tempDelay = 0 c := srv.newConn(rw) c.setState(c.rwc, StateNew, runHooks) // before Serve can return go c.serve(connCtx) } }
当监听套接字关闭,l.Accept()退出循环时,并不会等待go c.serve(connCtx)协程的处理完成。
overseer的处理方式是,包装了golang的监听套接字和连接套接字,通过sync.WaitGroup提供了对主协程异步等待work协程处理完成的支持。
overseer代码如下:
overseer-v1.1.6\graceful.go
func (l *overseerListener) Accept() (net.Conn, error) { conn, err := l.Listener.(*net.TCPListener).AcceptTCP() if err != nil { return nil, err } conn.SetKeepAlive(true) // see http.tcpKeepAliveListener conn.SetKeepAlivePeriod(3 * time.Minute) // see http.tcpKeepAliveListener uconn := overseerConn{ Conn: conn, wg: &l.wg, closed: make(chan bool), } go func() { //connection watcher select { case <-l.closeByForce: uconn.Close() case <-uconn.closed: //closed manually } }() l.wg.Add(1) return uconn, nil } //non-blocking trigger close func (l *overseerListener) release(timeout time.Duration) { //stop accepting connections - release fd l.closeError = l.Listener.Close() //start timer, close by force if deadline not met waited := make(chan bool) go func() { l.wg.Wait() waited <- true }() go func() { select { case <-time.After(timeout): close(l.closeByForce) case <-waited: //no need to force close } }() } //blocking wait for close func (l *overseerListener) Close() error { l.wg.Wait() return l.closeError } func (o overseerConn) Close() error { err := o.Conn.Close() if err == nil { o.wg.Done() o.closed <- true } return err }
在(l *overseerListener) Accept函数中,每生成一个work连接,执行l.wg.Add(1),在(o overseerConn) Close函数中,每关闭一个work连接,执行o.wg.Done()。
在异步关闭模式(l *overseerListener) release函数中和在同步关闭模式(l *overseerListener) Close函数中都会调用l.wg.Wait()以等待work协程的处理完成。
监听套接字关闭流程:
1、work进程收到重启信号,或者master进程收到重启信号然后转发到work进程。
2、work进程的信号处理里包含对(l *overseerListener) release的调用。
3、在(l *overseerListener) release里关闭监听套接字,并异步l.wg.Wait()。
4、在官方包net/http/server.go的 (srv *Server) Serve里l.Accept()出错返回,退出监听循环,然后执行defer l.Close(),即(l *overseerListener) Close。
5、在(l *overseerListener) Close里同步执行l.wg.Wait(),等待work连接处理完成。
6、work连接处理完成时,会调用(o overseerConn) Close(),进而调用o.wg.Done()。
7、所有work连接处理完成后,向master进程发送SIGUSR1信号。
8、master进程收到SIGUSR1信号后,将true写入mp.descriptorsReleased管道。
9、master进程的(mp *master) fork里,收到mp.descriptorsReleased后,结束本次fork,进入下一次fork。
二、连接的平滑重启
所谓平滑重启,就是重启不会造成客户端的断连,对客户端无感知,比如原有的排队连接不会被丢弃,所以监听套接字通过master进程在新旧work进程间传递,而不是新启的work进程重新创建监听连接。
监听套接字由master进程创建:
overseer-v1.1.6/proc_master.go
func (mp *master) retreiveFileDescriptors() error { mp.slaveExtraFiles = make([]*os.File, len(mp.Config.Addresses)) for i, addr := range mp.Config.Addresses { a, err := net.ResolveTCPAddr("tcp", addr) if err != nil { return fmt.Errorf("Invalid address %s (%s)", addr, err) } l, err := net.ListenTCP("tcp", a) if err != nil { return err } f, err := l.File() if err != nil { return fmt.Errorf("Failed to retreive fd for: %s (%s)", addr, err) } if err := l.Close(); err != nil { return fmt.Errorf("Failed to close listener for: %s (%s)", addr, err) } mp.slaveExtraFiles[i] = f } return nil }
从mp.Config.Addresses中拿到地址,建立监听连接,最后把文件句柄存入mp.slaveExtraFiles。
在这个过程中调用了(l *TCPListener) Close,但其实对work进程无影响,影响的只是master进程自己不能读写监听套接字。
这里引用下对网络套接字close和shutdown的区别:
close ---- 关闭本进程的socket id,但连接还是开着的,用这个socket id的其它进程还能用这个连接,能读或写这个socket id。
shutdown ---- 则破坏了socket 连接,读的时候可能侦探到EOF结束符,写的时候可能会收到一个SIGPIPE信号,这个信号可能直到socket buffer被填充了才收到,shutdown还有一个关闭方式的参数,0 不能再读,1不能再写,2 读写都不能。
将mp.slaveExtraFiles传递给子进程即work进程:
overseer-v1.1.6/proc_master.go
func (mp *master) fork() error { mp.debugf("starting %s", mp.binPath) cmd := exec.Command(mp.binPath) //mark this new process as the "active" slave process. //this process is assumed to be holding the socket files. mp.slaveCmd = cmd mp.slaveID++ //provide the slave process with some state e := os.Environ() e = append(e, envBinID+"="+hex.EncodeToString(mp.binHash)) e = append(e, envBinPath+"="+mp.binPath) e = append(e, envSlaveID+"="+strconv.Itoa(mp.slaveID)) e = append(e, envIsSlave+"=1") e = append(e, envNumFDs+"="+strconv.Itoa(len(mp.slaveExtraFiles))) cmd.Env = e //inherit master args/stdfiles cmd.Args = os.Args cmd.Stdin = os.Stdin cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr //include socket files cmd.ExtraFiles = mp.slaveExtraFiles if err := cmd.Start(); err != nil { return fmt.Errorf("Failed to start slave process: %s", err) } //was scheduled to restart, notify success if mp.restarting { mp.restartedAt = time.Now() mp.restarting = false mp.restarted <- true } //convert wait into channel cmdwait := make(chan error) go func() { cmdwait <- cmd.Wait() }() //wait.... select { case err := <-cmdwait: //program exited before releasing descriptors //proxy exit code out to master code := 0 if err != nil { code = 1 if exiterr, ok := err.(*exec.ExitError); ok { if status, ok := exiterr.Sys().(syscall.WaitStatus); ok { code = status.ExitStatus() } } } mp.debugf("prog exited with %d", code) //if a restarts are disabled or if it was an //unexpected crash, proxy this exit straight //through to the main process if mp.NoRestart || !mp.restarting { os.Exit(code) } case <-mp.descriptorsReleased: //if descriptors are released, the program //has yielded control of its sockets and //a parallel instance of the program can be //started safely. it should serve state.Listeners //to ensure downtime is kept at <1sec. The previous //cmd.Wait() will still be consumed though the //result will be discarded. } return nil }
通过cmd.ExtraFiles = mp.slaveExtraFiles语句向子进程传递套接字,这个参数最终传递给fork系统调用,传递的fd会被子进程继承。
子进程即work进程处理继承的套接字:
overseer-v1.1.6/proc_slave.go
func (sp *slave) run() error { sp.id = os.Getenv(envSlaveID) sp.debugf("run") sp.state.Enabled = true sp.state.ID = os.Getenv(envBinID) sp.state.StartedAt = time.Now() sp.state.Address = sp.Config.Address sp.state.Addresses = sp.Config.Addresses sp.state.GracefulShutdown = make(chan bool, 1) sp.state.BinPath = os.Getenv(envBinPath) if err := sp.watchParent(); err != nil { return err } if err := sp.initFileDescriptors(); err != nil { return err } sp.watchSignal() //run program with state sp.debugf("start program") sp.Config.Program(sp.state) return nil } func (sp *slave) initFileDescriptors() error { //inspect file descriptors numFDs, err := strconv.Atoi(os.Getenv(envNumFDs)) if err != nil { return fmt.Errorf("invalid %s integer", envNumFDs) } sp.listeners = make([]*overseerListener, numFDs) sp.state.Listeners = make([]net.Listener, numFDs) for i := 0; i < numFDs; i++ { f := os.NewFile(uintptr(3+i), "") l, err := net.FileListener(f) if err != nil { return fmt.Errorf("failed to inherit file descriptor: %d", i) } u := newOverseerListener(l) sp.listeners[i] = u sp.state.Listeners[i] = u } if len(sp.state.Listeners) > 0 { sp.state.Listener = sp.state.Listeners[0] } return nil }
子进程只是重新包装套接字,并没有新建监听连接,包装成u := newOverseerListener(l)类型,这些监听套接字最后传递给sp.Config.Program(sp.state),即用户的启动程序:
overseer-v1.1.6/example/main.go
// convert your 'main()' into a 'prog(state)' // 'prog()' is run in a child process func prog(state overseer.State) { fmt.Printf("app#%s (%s) listening...\n", BuildID, state.ID) http.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { d, _ := time.ParseDuration(r.URL.Query().Get("d")) time.Sleep(d) fmt.Fprintf(w, "app#%s (%s) %v says hello\n", BuildID, state.ID, state.StartedAt) })) http.Serve(state.Listener, nil) fmt.Printf("app#%s (%s) exiting...\n", BuildID, state.ID) } // then create another 'main' which runs the upgrades // 'main()' is run in the initial process func main() { overseer.Run(overseer.Config{ Program: prog, Address: ":5001", Fetcher: &fetcher.File{Path: "my_app_next"}, Debug: true, //display log of overseer actions TerminateTimeout: 10 * time.Minute, }) }
在用户程序中http.Serve(state.Listener, nil)调用:
1、使用的accept方式是包装后的(l *overseerListener) Accept()。
2、defer l.Close()使用也是包装后的(l *overseerListener) Close()。
3、由(l *overseerListener) Accept()创建的work连接也都包装成了overseerConn连接,在关闭时会调用(o overseerConn) Close()
三、文件变更的自动重启
能够自动监视文件变化,有变更时自动触发重启流程。
在master进程启动时检查配置,如果设置了mp.Config.Fetcher则进入fetchLoop:
overseer-v1.1.6/proc_master.go
// fetchLoop is run in a goroutine func (mp *master) fetchLoop() { min := mp.Config.MinFetchInterval time.Sleep(min) for { t0 := time.Now() mp.fetch() //duration fetch of fetch diff := time.Now().Sub(t0) if diff < min { delay := min - diff //ensures at least MinFetchInterval delay. //should be throttled by the fetcher! time.Sleep(delay) } } }
mp.Config.MinFetchInterval默认是1秒,也就是每秒检查一次变更。time.Duration类型,可以设置更小的粒度。
已经支持的fetcher包括:fetcher_file.go、fetcher_github.go、fetcher_http.go、fetcher_s3.go。
以fetcher_file.go为例说明。
1、文件变更的判断:
overseer-v1.1.6/proc_master.go
//tee off to sha1 hash := sha1.New() reader = io.TeeReader(reader, hash) //write to a temp file _, err = io.Copy(tmpBin, reader) if err != nil { mp.warnf("failed to write temp binary: %s", err) return } //compare hash newHash := hash.Sum(nil) if bytes.Equal(mp.binHash, newHash) { mp.debugf("hash match - skip") return }
通过sha1算法实现,比较新旧hash值,并没有关注文件时间戳。
2、验证是可执行文件,且是支持overseer的:
overseer-v1.1.6/proc_master.go
tokenIn := token() cmd := exec.Command(tmpBinPath) cmd.Env = append(os.Environ(), []string{envBinCheck + "=" + tokenIn}...) cmd.Args = os.Args returned := false go func() { time.Sleep(5 * time.Second) if !returned { mp.warnf("sanity check against fetched executable timed-out, check overseer is running") if cmd.Process != nil { cmd.Process.Kill() } } }() tokenOut, err := cmd.CombinedOutput() returned = true if err != nil { mp.warnf("failed to run temp binary: %s (%s) output \"%s\"", err, tmpBinPath, tokenOut) return } if tokenIn != string(tokenOut) { mp.warnf("sanity check failed") return }
这是通过overseer预埋的代码实现的:
overseer-v1.1.6/overseer.go
//sanityCheck returns true if a check was performed func sanityCheck() bool { //sanity check if token := os.Getenv(envBinCheck); token != "" { fmt.Fprint(os.Stdout, token) return true } //legacy sanity check using old env var if token := os.Getenv(envBinCheckLegacy); token != "" { fmt.Fprint(os.Stdout, token) return true } return false }
这段代码在main启动时在overseer.Run里会调用到,传递固定的环境变量,然后命令行输出会原样显示出来即为成功。
3、覆盖旧文件,并触发重启。
overseer-v1.1.6/proc_master.go
//overwrite! if err := overwrite(mp.binPath, tmpBinPath); err != nil { mp.warnf("failed to overwrite binary: %s", err) return } mp.debugf("upgraded binary (%x -> %x)", mp.binHash[:12], newHash[:12]) mp.binHash = newHash //binary successfully replaced if !mp.Config.NoRestartAfterFetch { mp.triggerRestart() }
由(mp *master) triggerRestart进入重启流程:
overseer-v1.1.6/proc_master.go
func (mp *master) triggerRestart() { if mp.restarting { mp.debugf("already graceful restarting") return //skip } else if mp.slaveCmd == nil || mp.restarting { mp.debugf("no slave process") return //skip } mp.debugf("graceful restart triggered") mp.restarting = true mp.awaitingUSR1 = true mp.signalledAt = time.Now() mp.sendSignal(mp.Config.RestartSignal) //ask nicely to terminate select { case <-mp.restarted: //success mp.debugf("restart success") case <-time.After(mp.TerminateTimeout): //times up mr. process, we did ask nicely! mp.debugf("graceful timeout, forcing exit") mp.sendSignal(os.Kill) } }
向子进程发送mp.Config.RestartSignal信号,子进程收到信号后,关闭监听套接字然后向父进程发送SIGUSR1信号:
overseer-v1.1.6/proc_slave.go
if len(sp.listeners) > 0 { //perform graceful shutdown for _, l := range sp.listeners { l.release(sp.Config.TerminateTimeout) } //signal release of held sockets, allows master to start //a new process before this child has actually exited. //early restarts not supported with restarts disabled. if !sp.NoRestart { sp.masterProc.Signal(SIGUSR1) } //listeners should be waiting on connections to close... }
父进程收到SIGUSR1信号后,通知mp.descriptorsReleased管道监听套接字已经关闭:
overseer-v1.1.6/proc_master.go
//**during a restart** a SIGUSR1 signals //to the master process that, the file //descriptors have been released if mp.awaitingUSR1 && s == SIGUSR1 { mp.debugf("signaled, sockets ready") mp.awaitingUSR1 = false mp.descriptorsReleased <- true } else
最终回到(mp *master) fork函数,fork函数一直在等待mp.descriptorsReleased通知或者cmd.Wait子进程退出,收到管道通知后fork退出,进入下一轮fork循环。
overseer-v1.1.6/proc_master.go
func (mp *master) fork() error { //... ... //... ... //... ... //convert wait into channel cmdwait := make(chan error) go func() { cmdwait <- cmd.Wait() }() //wait.... select { case err := <-cmdwait: //program exited before releasing descriptors //proxy exit code out to master code := 0 if err != nil { code = 1 if exiterr, ok := err.(*exec.ExitError); ok { if status, ok := exiterr.Sys().(syscall.WaitStatus); ok { code = status.ExitStatus() } } } mp.debugf("prog exited with %d", code) //if a restarts are disabled or if it was an //unexpected crash, proxy this exit straight //through to the main process if mp.NoRestart || !mp.restarting { os.Exit(code) } case <-mp.descriptorsReleased: //if descriptors are released, the program //has yielded control of its sockets and //a parallel instance of the program can be //started safely. it should serve state.Listeners //to ensure downtime is kept at <1sec. The previous //cmd.Wait() will still be consumed though the //result will be discarded. } return nil }
以上就是关于go平滑重启库overseer实现原理详解的详细内容,更多关于go平滑重启库overseer的资料请关注脚本之家其它相关文章!