Golang

关注公众号 jb51net

关闭
首页 > 脚本专栏 > Golang > Go限流与控制并发数

Go并发限制之限流与控制并发数的实践指南

作者:XMYX-0

在 Go 并发编程中,一个非常现实的问题是:goroutine很轻量,但不是无限资源,当你随手 go func() 起上成千上万的协程时,系统可能不会立刻崩,但会在某个瞬间出现问题,所以一个核心能力就是:如何控制并发数量与访问速率(限流),本文小编为大家详细介绍一下

在 Go 并发编程中,一个非常现实的问题是:goroutine 很轻量,但不是无限资源

当你随手 go func() 起上成千上万的协程时,系统可能不会立刻崩,但会在某个瞬间出现:

所以一个核心能力就是:如何控制并发数量与访问速率(限流)

这篇文章我们从工程视角,把 Go 的并发控制体系讲透。

核心概念

并发控制本质解决两个问题:

控制“同时做多少事”(并发数)

典型场景:

目标:避免 goroutine 无限增长

控制“单位时间做多少事”(限流)

典型场景:

目标:避免瞬时流量过大

本质是什么?

从设计角度看,Go 的并发控制本质是三种思想:

一句话总结:

并发控制 = 用“阻塞或排队”换取“系统稳定性”

基础使用示例

用 channel 控制最大并发数(最经典方式)

package main

import (
	"fmt"
	"time"
)

// 信号量控制并发数
func worker(id int, sem chan struct{}) {
	// 等待信号量 获取令牌(没有空间会阻塞)
	sem <- struct{}{}
	// 执行任务
	fmt.Printf("worker %d start\n", id)
	time.Sleep(time.Second)
	fmt.Printf("worker %d done\n", id)
	// 释放信号量 释放令牌(有空间会唤醒等待的goroutine)
	<-sem
}
func main() {
	// 信号量大小为3,表示最多只能有3个goroutine同时执行
	sem := make(chan struct{}, 3)
	// 启动10个goroutine
	for i := 0; i < 10; i++ {
		// 每个goroutine都会等待信号量,直到有可用资源
		go worker(i, sem)
	}
	// 主goroutine等待5秒,以便观察输出
	time.Sleep(time.Second * 5)
}

小结

进阶使用示例

Worker Pool(生产级常见模型)

package main
import (
	"fmt"
	"time"
)
// 定义任务结构体
type Task struct {
	ID int // 任务ID
}
// 定义 worker 函数
func worker(id int, tasks <-chan Task, results chan<- int) {
	// 循环处理任务
	for task := range tasks {
		// 处理任务逻辑
		fmt.Printf("worker %d 进程 task %d\n", id, task.ID)
		time.Sleep(time.Second)
		// 返回结果
		results <- task.ID * 2
	}
}
// 主函数
func main() {
	// 创建任务和结果通道, 缓冲区大小为10
	taskChan := make(chan Task, 10)
	resultChan := make(chan int, 10)
	// 启动固定数量 worker
	for i := 0; i < 3; i++ {
		go worker(i, taskChan, resultChan)
	}
	// 投递任务
	for i := 0; i < 10; i++ {
		taskChan <- Task{ID: i}
	}
	// 关闭任务通道,让 worker 知道没有更多的任务了
	close(taskChan)
	// 收集结果
	for i := 0; i < 10; i++ {
		fmt.Println("result:", <-resultChan)
	}
}

留个思考:
如何撑爆缓冲区???

输出:

worker 2 进程 task 0
worker 0 进程 task 1
worker 1 进程 task 2
worker 1 进程 task 3
result: 4
worker 0 进程 task 4
result: 2
result: 0
worker 2 进程 task 5
worker 2 进程 task 6
result: 10
result: 8
worker 1 进程 task 8
result: 6
worker 0 进程 task 7
worker 1 进程 task 9
result: 16
result: 14
result: 12
result: 18

小结

context + 并发控制(超时/取消)

package main

import (
	"context"
	"fmt"
	"time"
)

// worker 协程执行函数
func worker(ctx context.Context, id int, sem chan struct{}) { // 等待信号量可用
	// 等待信号量可用,或者被取消
	select {
	case sem <- struct{}{}:
	case <-ctx.Done():
		fmt.Println("cancel worker", id)
		return
	}
	// 执行任务
	defer func() { <-sem }()

	fmt.Println("start worker", id)
	// 模拟执行任务
	select {
	case <-time.After(2 * time.Second):
		fmt.Println("done worker", id)
	case <-ctx.Done():
		fmt.Println("timeout worker", id)
	}
}

func main() {
	// 创建带超时的上下文
	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
	// 延迟取消,等待所有协程执行完毕
	defer cancel()

	sem := make(chan struct{}, 2)

	for i := 0; i < 5; i++ {
		go worker(ctx, i, sem)
	}

	time.Sleep(5 * time.Second)
}

输出:

start worker 4
start worker 0
done worker 4
start worker 1
done worker 0
start worker 2
cancel worker 3
timeout worker 1
timeout worker 2

小结

Token Bucket 简化限流模型

package main

import (
	"fmt"
	"time"
)

// rateLimiter 返回一个通道,每隔500毫秒向该通道发送时间戳。
func rateLimiter() <-chan time.Time {
	return time.Tick(500 * time.Millisecond)
}

func main() {
	// 创建一个速率限制器,每隔500毫秒允许一个请求。
	limiter := rateLimiter()
	// 模拟10个请求,每个请求等待速率限制器允许。
	for i := 0; i < 10; i++ {
		<-limiter // 拿令牌
		fmt.Println("request", i, time.Now())
	}
}

输出:

request 0 2026-04-22 22:10:09.784786939 +0800 CST m=+0.500643508
request 1 2026-04-22 22:10:10.284295056 +0800 CST m=+1.000151673
request 2 2026-04-22 22:10:10.784965989 +0800 CST m=+1.500822557
request 3 2026-04-22 22:10:11.284293288 +0800 CST m=+2.000149905
request 4 2026-04-22 22:10:11.784967432 +0800 CST m=+2.500823999
request 5 2026-04-22 22:10:12.284286155 +0800 CST m=+3.000142774
request 6 2026-04-22 22:10:12.784972686 +0800 CST m=+3.500829260
request 7 2026-04-22 22:10:13.284287308 +0800 CST m=+4.000143926
request 8 2026-04-22 22:10:13.78496704 +0800 CST m=+4.500823608
request 9 2026-04-22 22:10:14.28429124 +0800 CST m=+5.000147855

小结

常见错误与坑(重点)

坑一:goroutine 泄漏(最隐蔽)

错误代码

func worker(done chan bool) {
	for {
		// 永久阻塞
	}
}

为什么错

正确写法

func worker(ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			return
		default:
			// do work
		}
	}
}

坑二:channel 无缓冲导致死锁

错误代码

sem := make(chan struct{})

sem <- struct{}{} // 直接阻塞死锁

原因

正确写法

sem := make(chan struct{}, 3)
sem <- struct{}{}

坑三:忘记释放信号量

错误代码

sem <- struct{}{}

if err != nil {
	return // 没释放
}

<-sem

原因

正确写法

sem <- struct{}{}
defer func() { <-sem }()

底层原理解析(核心)

Go 并发控制核心依赖三类机制:

channel 本质

channel 是一个:

行为机制:

本质:生产者-消费者队列 + 调度器唤醒

semaphore(信号量)

chan struct{}{}

等价于:

行为:

本质:资源计数器

context 取消机制

内部结构:

触发机制:

本质:广播式取消信号

为什么 Go 用 channel 做并发控制?

Go 的设计哲学:

Do not communicate by sharing memory; share memory by communicating.

因此:

并发控制被抽象为“通信问题”

对比与扩展

在 Go 的并发控制中,有几个看起来很像,但本质差异很大的实现方式,很容易在工程中用错。

channel 缓冲控制 vs 无缓冲 channel 控制

这两种写法经常被混用,但行为完全不同。

无缓冲 channel:同步阻塞模型

sem := make(chan struct{})

go func() {
	sem <- struct{}{} // 发送必须等待接收
}()

特点:

更像“同步点”,而不是限流工具

带缓冲 channel:并发控制模型

sem := make(chan struct{}, 3)

sem <- struct{}{} // 超过3会阻塞

特点:

工程中标准并发控制方案

ticker 限流 vs token bucket 思想

很多人会把 time.Ticker 当限流工具,但它和真正限流模型有差异。

ticker:固定节奏触发

for range time.Tick(time.Second) {
	fmt.Println("do request")
}

特点:

更像“节拍器”

token bucket:可突发限流模型(思想层面)

核心思想:

特点:

ticker 是“定时器”,token bucket 是“资源池”

worker pool vs goroutine 直接并发

这是最容易写错的一点。

直接 goroutine 并发(风险模型)

for i := 0; i < 10000; i++ {
	go doTask(i)
}

问题:

适合“低频 + 小规模任务”

worker pool(稳定模型)

for i := 0; i < 10; i++ {
	go worker(tasks)
}

特点:

适合“生产级任务处理”

小结

这三组对比的核心差异可以归纳为一句话:

一句话升级理解

并发控制的本质不是“限制 goroutine”,而是:

在系统可承受范围内,把“执行权”变成一种可调度资源

思考与升华(加分项)

如果抽象 Go 并发控制,本质只有三件事:

可以用一个极简模型表示:

producer → queue → worker pool → limiter → consumer

甚至可以自己实现一个简化版本:

package main

import "fmt"

// 模拟并发处理任务,限制同时处理的数量为5
func handle(task int) {
	fmt.Println(task)
}
func main() {
	// 限制并发数量为5
	sem := make(chan struct{}, 5)
	// 任务队列
	tasks := make(chan int, 10)
	// 启动任务生产者
	go func() {
		// 生产任务
		for i := 0; i < cap(tasks); i++ {
			tasks <- i
		}
		// 关闭任务队列
		close(tasks)
	}()
	// 启动任务消费者
	for task := range tasks {
		sem <- struct{}{}
		go func() {
			defer func() { <-sem }()

			handle(task)
		}()
	}
}

本质总结

Go 的并发控制,不是“控制 goroutine”,而是:

控制资源流动的节奏与边界

点睛总结

真正的并发能力,不是“能起多少 goroutine”,而是“能稳住多少流量”。

以上就是Go并发限制之限流与控制并发数的实践指南的详细内容,更多关于Go限流与控制并发数的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文