Golang

关注公众号 jb51net

关闭
首页 > 脚本专栏 > Golang > golang sql超时控制

golang sql语句超时控制方案及原理

作者:hayson

一般应用程序在执行一条sql语句时,都会给这条sql设置一个超时时间,本文主要介绍了golang sql语句超时控制方案及原理,具有一定的参考价值,感兴趣的可以了解一下

一般应用程序在执行一条sql语句时,都会给这条sql设置一个超时时间,如果到超时时间还未执行完,则直接终止sql,释放资源,返回错误。这里主要讨论一下在golang+mysql的场景下,对sql语句进行超时控制的具体做法、实现原理以及对连接池连接数产生的影响。

基于context实现sql语句的超时控制:

使用context进行超时控制是golang的标准做法,可以说当一个函数第一个参数是ctx context.Context时,这个函数就应该做出承诺,在收到ctx的取消信号时应该提前终止该函数的执行,并释放资源。目前后端应用程序操作数据库时比较常用的做法是使用gorm框架,这个框架主要是起到sql拼接和屏蔽底层数据库差异的作用,本身并没有提供连接池以及mysql的client端驱动程序,连接池默认使用的是database/sql标准库提供的连接池,驱动程序使用的是go-sql-driver/mysql。故要分析如何基于context进行超时控制,需要从这三层进行分析。

对于gorm,想要对一个sql进行超时控制,可以直接使用WithContext()方法,具体如下:

func main() {
  ctx := context.TODO()
  ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
  defer cancel()
  err := db.WithContext(ctx).Exec("select sleep(10)").Error
  if err != nil {
    log.Fatal(err)
  }
}
​
// output
// [3001.379ms] [rows:0] select sleep(10)
// 2023/12/17 13:31:54 context deadline exceeded

这里将ctx的超时时间设置为3s,同时sql语句为sleep 10s,最终在执行时间到3s时,返回了context deadline exceeded错误。

gorm调用WithContext之后,最终会将这个ctx给到database/sql连接池的ExecContext函数之中,

func (db *DB) ExecContext(ctx context.Context, query string, args ...any) (Result, error) {
  var res Result
  var err error
​
  err = db.retry(func(strategy connReuseStrategy) error {
    res, err = db.exec(ctx, query, args, strategy)
    return err
  })
​
  return res, err
}

该函数会从连接池中取出一个连接,然后由数据库驱动层实际执行sql,

func (mc *mysqlConn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
    dargs, err := namedValueToValue(args)
    if err != nil {
       return nil, err
    }
    // 这里是核心代码,将ctx放入监听队列
    if err := mc.watchCancel(ctx); err != nil {
       return nil, err
    }
    defer mc.finish()
​
    return mc.Exec(query, dargs)
}

go-sql-drive/mysql在实际和mysql server端通信之前,会调用watchCancel,监听当前ctx的取消信号,保证sql执行过程中,能够立刻收到取消信号,并做出sql取消的操作。watchCancel函数具体实现如下:

func (mc *mysqlConn) watchCancel(ctx context.Context) error {
    if mc.watching {
       // Reach here if canceled,
       // so the connection is already invalid
       mc.cleanup()
       return nil
    }
    // When ctx is already cancelled, don't watch it.
    if err := ctx.Err(); err != nil {
       return err
    }
    // When ctx is not cancellable, don't watch it.
    if ctx.Done() == nil {
       return nil
    }
    // When watcher is not alive, can't watch it.
    if mc.watcher == nil {
       return nil
    }
​
    // 在正常情况下会走到这里,将ctx放入到一个watcher管道
    mc.watching = true
    mc.watcher <- ctx
    return nil
}

可以看到核心代码是将这个ctx放入到一个管道,那么必定有一段程序是监听这个管道的,实际上是如下代码:

func (mc *mysqlConn) startWatcher() {
    watcher := make(chan context.Context, 1)
    mc.watcher = watcher
    finished := make(chan struct{})
    mc.finished = finished
    go func() {
       for {
          var ctx context.Context
          select {
          case ctx = <-watcher:
          case <-mc.closech:
             return
          }
​
          select {
          // 在这里监听了ctx取消信号,并实际执行cancel操作
          case <-ctx.Done():
             mc.cancel(ctx.Err())
          case <-finished:
          case <-mc.closech:
             return
          }
       }
    }()
}

这个startWatcher函数内部会单独启动一个协程,监听本连接的watcher管道,针对于每一个从管道中取出的ctx,监听其取消信号是否结束,同一个连接上的sql语句肯定是依次执行的,这样依次监听每一个ctx是不会有什么问题的,而这个startWatcher会在连接创建的时候调用,保证后续这个连接上的每个语句添加的ctx都会被监听。

如果真的监听到取消信号,就会调用cancel函数进行取消,

// finish is called when the query has canceled.
func (mc *mysqlConn) cancel(err error) {
    mc.canceled.Set(err)
    mc.cleanup()
}
​
func (mc *mysqlConn) cleanup() {
  if !mc.closed.TrySet(true) {
    return
  }
​
  // Makes cleanup idempotent
  close(mc.closech)
  if mc.netConn == nil {
    return
  }
  // 核心代码如下,关闭了通信所使用的TCP连接
  if err := mc.netConn.Close(); err != nil {
    errLog.Print(err)
  }
}

最终在收到取消信号时,会关闭和mysql server进行通信的TCP连接。

基于DSN中的readTimeout和writeTimeout实现sql语句的超时控制:

还有一种方法是在打开一个db对象的dsn中指定,具体做法如下:

func init() {
    dsn := "root:12345678@tcp(localhost:3306)/test?charset=utf8mb4&parseTime=True&loc=Local&timeout=1500ms&readTimeout=3s&writeTimeout=3s"
    var err error
    db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{})
    if err != nil {
       log.Fatalln(err)
    }
    db.Logger.LogMode(logger.Info)
}
​
func main() {
  ctx := context.TODO()
  err := db.WithContext(ctx).Exec("select sleep(10)").Error
  if err != nil {
    log.Fatal(err)
  }
}
​
// output
// [3002.597ms] [rows:0] select sleep(10)
// 2023/12/17 14:21:11 invalid connection

在dsn中指定readTimeout=3s&writeTimeout=3s,同时执行一个sleep(10),同样可以在第三秒时报错,但报错会有一点点奇怪,invalid connection,看起来好像和超时没有啥关系,这是因为这两个超时时间的含义其实是针对于mysql底层使用的TCP连接而言的,即readTimeout是从TCP连接中读取一个数据包的超时时间,writeTimeout是向一个TCP连接写入一个数据包的超时时间,并且这个超时是基于连接的deadline实现的,所以一旦超时就会认为这个连接是异常的,最终返回这样一个连接异常的报错。

具体的实现原理仍然是在go-sql-driver/mysql中,在创建连接时,会处理这两个timeout,

...
​
// 这就是上面说的启动监听ctx的逻辑
mc.startWatcher()
if err := mc.watchCancel(ctx); err != nil {
    mc.cleanup()
    return nil, err
}
defer mc.finish()
​
mc.buf = newBuffer(mc.netConn)
​
// 在解析了dsn之后,就会将两个timeout赋值给核心连接对象的两个属性
mc.buf.timeout = mc.cfg.ReadTimeout
mc.writeTimeout = mc.cfg.WriteTimeout
​
...

在实际和mysql server进行通信时,会用到这两个属性,

func (b *buffer) readNext(need int) ([]byte, error) {
    if b.length < need {
       // refill
       if err := b.fill(need); err != nil {
          return nil, err
       }
    }
​
    offset := b.idx
    b.idx += need
    b.length -= need
    return b.buf[offset:b.idx], nil
}
​
// fill reads into the buffer until at least _need_ bytes are in it
func (b *buffer) fill(need int) error {
  
  ...go
  
  for {
    // 若timeout>0,则基于这个超时时间,给连接设置一个新的deadline
    if b.timeout > 0 {
      if err := b.nc.SetReadDeadline(time.Now().Add(b.timeout)); err != nil {
        return err
      }
    }
​
    nn, err := b.nc.Read(b.buf[n:])
    n += nn
​
    switch err {
    case nil:
      if n < need {
        continue
      }
      b.length = n
      return nil
​
    case io.EOF:
      if n >= need {
        b.length = n
        return nil
      }
      return io.ErrUnexpectedEOF
​
    default:
      return err
    }
  }
}

在每次需要从mysql server获取数据的时候,都会给这次读操作设置一个deadline,具体的时间就是当前时间+timeout值,这样每次从server端读取数据的时候,一旦超出这个时间,就会报一个io timeout错误,而上游再收到这个错误之后,则会进行如下处理:

data, err = mc.buf.readNext(pktLen)
if err != nil {
    if cerr := mc.canceled.Value(); cerr != nil {
       return nil, cerr
    }
    errLog.Print(err)
    // 关闭当前连接
    mc.Close()
    // 返回invalid connection错误
    return nil, ErrInvalidConn
}

首先将这条连接关闭,之后返回了invalid connection错误,这也就是为什么上面例子中超时的报错是invalid connection。核心需要关注的还是Close方法,这里是超时后续的处理:

func (mc *mysqlConn) Close() (err error) {
    // Makes Close idempotent
    if !mc.closed.IsSet() {
      // 向mysql server发送quit命令,表明自己要退出了
       err = mc.writeCommandPacket(comQuit)
    }
​
    // 调用cleanup,和上面监听ctx取消信号后的操作是一致的
    mc.cleanup()
​
    return
}

首先发送一个quit命令,告知自己需要退出,之后也使用cleanup方法,关闭tcp连接。可以看到这里的超时控制逻辑和基于ctx的对比,基本是一致的,就是目前这种方案还给server端发送了一个quit指令,从外部使用上看似乎加不加这个指令效果都是一样的,只要连接关闭了mysql server端就可以回收自己的资源(不一定能立刻回收,但最终会回收)。我查找了一些资料和mysql的官方文档,并没有找到如果不发送quit指令,直接关闭tcp连接会有什么影响,我也没有研究过mysql的源码,如果有人知道的话,还请不吝赐教。但我想应该没有太大问题,要不然基于ctx的超时控制早就出问题了。

sql语句超时时连接池如何处理:

以上两种sql超时的方案我个人觉得都没有什么问题,底层最终面对超时时所做的操作也基本一致(关闭TCP连接),我个人更喜欢基于ctx的方案,毕竟ctx设计之初就是用来做这件事的,也可以和其他场景下的超时控制保持一致,报错信息也更友好一些。接下来需要考虑的就是一旦底层出现报错,连接被关闭,上层的连接池是如何处理的,

func (db *DB) execDC(ctx context.Context, dc *driverConn, release func(error), query string, args []any) (res Result, err error) {
    defer func() {
       // 核心代码在这里,执行完sql之后需要释放当前连接,释放时会基于err是否为nil做出处理
       release(err)
    }()
    execerCtx, ok := dc.ci.(driver.ExecerContext)
    var execer driver.Execer
    if !ok {
       execer, ok = dc.ci.(driver.Execer)
    }
    if ok {
       var nvdargs []driver.NamedValue
       var resi driver.Result
       withLock(dc, func() {
          nvdargs, err = driverArgsConnLocked(dc.ci, nil, args)
          if err != nil {
             return
          }
          // 驱动层实际进行查询
          resi, err = ctxDriverExec(ctx, execerCtx, execer, query, nvdargs)
       })
       if err != driver.ErrSkip {
          if err != nil {
             return nil, err
          }
          return driverResult{dc, resi}, nil
       }
    }
    ...
}

exec执行完之后,会释放该连接,将其放回连接池,以供其他查询使用,放回连接池时会依据本条sql是否有错误进行处理,release函数时注入进来的,实际上是releaseConn函数,该函数内部调用了putConn函数,

func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
    if !errors.Is(err, driver.ErrBadConn) {
       // 这里判断了一下连接是不是已经不可用了,若已经不可用则将err赋值为ErrBadConn
       if !dc.validateConnection(resetSession) {
          err = driver.ErrBadConn
       }
    }
    db.mu.Lock()
    if !dc.inUse {
       db.mu.Unlock()
       if debugGetPut {
          fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc])
       }
       panic("sql: connection returned that was never out")
    }
    // 若连接已到最大生存时间,也要标记连接已经不可用
    if !errors.Is(err, driver.ErrBadConn) && dc.expired(db.maxLifetime) {
       db.maxLifetimeClosed++
       err = driver.ErrBadConn
    }
    if debugGetPut {
       db.lastPut[dc] = stack()
    }
    dc.inUse = false
    dc.returnedAt = nowFunc()
​
    for _, fn := range dc.onPut {
       fn()
    }
    dc.onPut = nil
    // 若连接不可用,进行如下处理
    if errors.Is(err, driver.ErrBadConn) {
       // 有一个连接被关闭,考虑打开一个新的连接
       db.maybeOpenNewConnections()
       db.mu.Unlock()
       // 关闭该连接
       dc.Close()
       return
    }
    if putConnHook != nil {
       putConnHook(db, dc)
    }
    // sql执行正常,或者有一些错误但连接是正常的,会正常的归还连接
    added := db.putConnDBLocked(dc, nil)
    db.mu.Unlock()
​
    if !added {
       dc.Close()
       return
    }
}

整体而言,在sql执行出现异常时,会判断一下连接是否可用,这个判断也是于驱动层完成,驱动层实现了如下方法:

// IsValid implements driver.Validator interface
// (From Go 1.15)
func (mc *mysqlConn) IsValid() bool {
    return !mc.closed.IsSet()
}

用于告知连接池这个连接是否还正常,而在sql执行超时最后调用的cleanup方法里,首先就是标记这个连接已经不可用了

func (mc *mysqlConn) cleanup() {
  // 标记连接不可用
  if !mc.closed.TrySet(true) {
    return
  }
}

在判断连接异常,或者超出最大生存时间之后,就是调用连接池的Close方法,注意是连接池的Close,不是驱动层的Close,这个Close最终会调用到finalClose。

func (dc *driverConn) finalClose() error {
    var err error
    var openStmt []*driverStmt
    withLock(dc, func() {
       openStmt = make([]*driverStmt, 0, len(dc.openStmt))
       for ds := range dc.openStmt {
          openStmt = append(openStmt, ds)
       }
       dc.openStmt = nil
    })
    for _, ds := range openStmt {
       ds.Close()
    }
    withLock(dc, func() {
       // 这里调用驱动层进行连接的关闭
       dc.finalClosed = true
       err = dc.ci.Close()
       dc.ci = nil
    })
​
    dc.db.mu.Lock()
    // 当前打开连接数减一
    dc.db.numOpen--
    dc.db.maybeOpenNewConnections()
    dc.db.mu.Unlock()
​
    dc.db.numClosed.Add(1)
    return err
}

这个方法主要是做两件事,一是在驱动层实际关闭连接,这主要是针对达到最大生存时间的连接,对于sql执行超时这种本身就已经关闭了的连接是不会再关闭一次的,TrySet会执行不成功,后面TCP链接关闭的操作是不会继续执行的。关闭连接之后,让当前打开的连接数减一,从而保证可以正常打开新的连接。

有可能带来的问题:

综上所述,这两种超时控制的方法实现原理虽然有所区别,但在发现超时后做的事情是一致的,都是关闭该连接,并且让连接池打开连接数量减一,这其实存在着一个问题,因为client端虽然正常调了Close,认为连接已经关闭了,但其实mysql server端在非sleep状态下是感知不到连接关闭的消息的,一种具体的情况就是比如mysql的某个连接正在执行一个耗时的查询,但是这时到了超时时间,client主动关闭了连接,但是mysql server端是不会立刻终止查询并关闭连接的,show processlist时,仍然能看到连接中的sql还在正常执行。其实观察TCP连接的状态也能看到这一现象,在双方正常通信时,状态为

tcp6 0 0 ::1.3306 ::1.61351 ESTABLISHED
tcp6 0 0 ::1.61351 ::1.3306 ESTABLISHED

在client端主动调用Close之后,server端由于要执行当前sql会一直保持在CLOSE_WAIT状态,client端进入FIN_WAIT_2状态,直到server端在sql执行完成后才会进行后续的挥手过程,才能真正关闭连接。

tcp6 5 0 ::1.3306 ::1.61351 CLOSE_WAIT
tcp6 0 0 ::1.61351 ::1.3306 FIN_WAIT_2

问题就在于server端连接还未关闭,但连接池那边连接数已经减一了,后续可以创建新的连接了,这就导致mysql server端的连接数是会高于连接池的最大连接数的,如果超时的sql很多,很有可能导致连接数超出连接池最大连接数限制达到mysql server端的最大连接数,后续新的连接将无法建立,直接返回too many connectios错误,如果这些连接执行的sql又真的很慢,或者发生死锁,可能会出现mysql较长时间直接拒绝服务的情况。这表明在生产环境下尽量不要将mysql的max connectios参数设置的和数据库最大连接数比较接近,还是要留出一定的余量,避免在出现很多sql超时时这部分泄露的连接直接将mysql连接数打满,导致数据库出现不可用。

到此这篇关于golang sql语句超时控制方案及原理的文章就介绍到这了,更多相关golang sql超时控制内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文