浅谈Go语言中高效并发模式
作者:数据知道
Go语言并发编程提供了丰富的模式,包括基础Goroutine、Channel和Select机制,以及多种高级并发模式,本文就来详细的介绍一下,感兴趣的可以了解一下
一、并发编程基础回顾
Go语言以其轻量级的Goroutine和简洁的并发控制机制(如Channel和Select)闻名,非常适合构建高并发、高吞吐量的系统。
1.1 Goroutine
Go语言中的Goroutine是轻量级线程,由Go运行时调度。启动一个Goroutine非常简单:
go func() {
fmt.Println("Hello from goroutine")
}()
1.2 Channel
Channel是Go语言中用于Goroutine之间通信的机制,支持同步与数据传递:
ch := make(chan int)
go func() { ch <- 42 }()
value := <-ch
fmt.Println(value)
1.3 Select
Select用于在多个Channel操作中进行选择,常用于超时控制、多路复用等场景:
select {
case msg := <-ch1:
fmt.Println("Got from ch1:", msg)
case msg := <-ch2:
fmt.Println("Got from ch2:", msg)
case <-time.After(1 * time.Second):
fmt.Println("Timeout")
}
二、常用并发模式详解
2.1 Worker Pool(工作池模式)
适用场景:任务分发与并行处理,如爬虫、批量任务处理。
案例:使用固定数量的Worker处理任务队列。
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for j := range jobs {
fmt.Printf("Worker %d started job %d\n", id, j)
time.Sleep(time.Second) // 模拟耗时任务
fmt.Printf("Worker %d finished job %d\n", id, j)
results <- j * 2
}
}
func main() {
const numJobs = 5
const numWorkers = 3
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
var wg sync.WaitGroup
// 启动3个worker
for w := 1; w <= numWorkers; w++ {
wg.Add(1)
go worker(w, jobs, results, &wg)
}
// 发送5个任务
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)
// 等待所有worker完成
wg.Wait()
close(results)
// 收集结果
for result := range results {
fmt.Println("Result:", result)
}
}
2.2 Fan-in / Fan-out(扇入/扇出模式)
适用场景:将多个输入合并到一个输出(Fan-in),或将一个输入分发到多个处理单元(Fan-out)。
案例:
package main
import (
"fmt"
"sync"
"time"
)
func producer(id int, out chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 3; i++ {
fmt.Printf("Producer %d produced %d\n", id, i)
out <- id*10 + i
time.Sleep(200 * time.Millisecond)
}
}
func consumer(in <-chan int, done chan<- bool) {
for value := range in {
fmt.Printf("Consumed: %d\n", value)
}
done <- true
}
func main() {
ch := make(chan int)
done := make(chan bool)
var wg sync.WaitGroup
// 启动3个producer
for i := 1; i <= 3; i++ {
wg.Add(1)
go producer(i, ch, &wg)
}
// 启动1个consumer
go consumer(ch, done)
// 等待所有producer完成
go func() {
wg.Wait()
close(ch)
}()
// 等待consumer完成
<-done
fmt.Println("All done!")
}
2.3 Pipeline(流水线模式)
适用场景:数据流处理,分阶段处理任务,如ETL流程。
案例:模拟数据经过多个阶段的处理。
package main
import (
"fmt"
"sync"
)
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
func main() {
// 构建pipeline: generator -> square -> print
input := generator(1, 2, 3, 4, 5)
squares := square(input)
// 打印结果
for result := range squares {
fmt.Println(result)
}
}
2.4 Timeout / Cancel(超时与取消模式)
适用场景:控制任务执行时间,防止阻塞或资源浪费。
案例:使用Context实现超时控制。
package main
import (
"context"
"fmt"
"time"
)
func longRunningTask(ctx context.Context) {
select {
case <-time.After(3 * time.Second):
fmt.Println("Task completed")
case <-ctx.Done():
fmt.Println("Task canceled:", ctx.Err())
}
}
func main() {
// 带超时的context
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
go longRunningTask(ctx)
// 等待足够长时间观察结果
time.Sleep(4 * time.Second)
}
2.5 Future / Promise(异步结果模式)
适用场景:异步执行任务并获取结果,常用于HTTP请求、数据库查询等。
案例:模拟异步任务并获取结果。
package main
import (
"fmt"
"time"
)
type Future struct {
result chan int
}
func NewFuture() (*Future, func(int)) {
f := &Future{result: make(chan int)}
return f, func(r int) { f.result <- r }
}
func (f *Future) Get() int {
return <-f.result
}
func asyncCompute(complete func(int)) {
go func() {
time.Sleep(2 * time.Second)
complete(42)
}()
}
func main() {
future, complete := NewFuture()
asyncCompute(complete)
fmt.Println("Waiting for result...")
result := future.Get()
fmt.Println("Result:", result)
}
三、进阶并发模式
3.1 Or-Done 模式
适用场景:监听多个Channel,任意一个完成即退出。
package main
import (
"fmt"
"time"
)
func orDone(channels ...<-chan interface{}) <-chan interface{} {
switch len(channels) {
case 0:
return nil
case 1:
return channels[0]
}
orDone := make(chan interface{})
go func() {
defer close(orDone)
switch len(channels) {
case 2:
select {
case <-channels[0]:
case <-channels[1]:
}
default:
select {
case <-channels[0]:
case <-channels[1]:
case <-channels[2]:
case <-orDone(channels[3:]...):
}
}
}()
return orDone
}
func main() {
sig := make(chan interface{})
time.AfterFunc(2*time.Second, func() { close(sig) })
start := time.Now()
<-orDone(sig, time.After(5*time.Second))
fmt.Printf("Done after %v\n", time.Since(start))
}
3.2 Rate Limiting(限流模式)
适用场景:控制请求频率,防止系统过载。
package main
import (
"fmt"
"time"
)
func main() {
requests := make(chan int, 5)
for i := 1; i <= 5; i++ {
requests <- i
}
close(requests)
// 限制每200ms处理一个请求
limiter := time.Tick(200 * time.Millisecond)
for req := range requests {
<-limiter
fmt.Println("Request", req, time.Now())
}
// 突发限制模式
burstyLimiter := make(chan time.Time, 3)
for i := 0; i < 3; i++ {
burstyLimiter <- time.Now()
}
go func() {
for t := range time.Tick(200 * time.Millisecond) {
burstyLimiter <- t
}
}()
burstyRequests := make(chan int, 5)
for i := 1; i <= 5; i++ {
burstyRequests <- i
}
close(burstyRequests)
for req := range burstyRequests {
<-burstyLimiter
fmt.Println("Bursty request", req, time.Now())
}
}
四、注意点
4.1 使用说明
- 每个示例都是独立的Go程序,可以直接保存为
.go文件运行 - 所有示例都包含了必要的包导入和错误处理
- 每个示例都有详细的注释说明关键步骤
- 可以通过调整参数(如worker数量、超时时间等)观察不同行为
4.2 模式选择对比
| 模式 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| Worker Pool | 任务并行处理 | 控制并发数,防止资源耗尽 | 需要任务队列管理 |
| Fan-in/Fan-out | 数据合并/分发 | 提高吞吐量 | 需注意Channel关闭时机 |
| Pipeline | 流式处理 | 模块化、易扩展 | 阶段多时延迟高 |
| Timeout/Cancel | 任务超时控制 | 避免阻塞 | 需正确使用Context |
| Future/Promise | 异步结果获取 | 非阻塞编程 | 结果获取需同步 |
| Or-Done | 多事件监听 | 灵活组合 | 实现较复杂 |
| Rate Limiting | 流量控制 | 防止过载 | 需合理设置阈值 |
4.3 选择建议
- 避免共享内存,优先使用Channel通信
- 使用
sync.WaitGroup等待Goroutine完成 - 合理使用
context管理生命周期 - 注意Channel的关闭,避免panic
- 使用
select+default实现非阻塞操作 - 控制Goroutine数量,防止资源泄漏
到此这篇关于浅谈Go语言中高效并发模式 的文章就介绍到这了,更多相关Go语言高效并发模式内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
