浅谈Go用于同步和并发控制的几种常见锁
作者:风不归Alkaid
1. 互斥锁(Mutex)
sync.Mutex:这是最基本的互斥锁,用于保护共享资源防止同时访问。
它有两个主要的方法:
Lock():获取锁,如果锁已经被其他Goroutine获取,则等待。Unlock():释放锁。
1.1 示例
创建多个goroutine来增加一个共享变量的值。为了防止并发访问导致的数据竞争,我们将使用sync.Mutex来确保每次只有一个goroutine可以修改变量。
package main
import (
"fmt"
"sync"
"time"
)
// 定义一个共享资源
var counter int = 0
// 创建一个互斥锁
var lock sync.Mutex
func main() {
// 创建一个等待组,以便等待所有goroutine完成
var wg sync.WaitGroup
// 启动多个goroutine来增加计数器
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 10; j++ {
// 在修改共享资源前获取锁
lock.Lock()
// 修改共享资源
counter++
fmt.Printf("Goroutine %d sees counter: %d\n", id, counter)
// 释放锁
lock.Unlock()
// 等待一段时间,模拟处理过程
time.Sleep(time.Millisecond * 10)
}
}(i)
}
// 等待所有goroutine完成
wg.Wait()
// 打印最终的计数器值
fmt.Println("Final counter:", counter)
}
1.2 代码解释
- 共享资源:这里的共享资源是
counter变量,所有goroutine都会尝试修改它。 - 互斥锁:使用
sync.Mutex来保护对counter的访问。在每次修改前,goroutine会调用lock.Lock()来尝试获取锁,完成修改后调用lock.Unlock()释放锁。 - 等待组:使用
sync.WaitGroup来等待所有goroutine完成。每启动一个goroutine,调用wg.Add(1),每个goroutine完成时调用wg.Done()。 - 并发执行:通过
go func(id int)启动goroutine,每个goroutine都尝试多次修改counter,并在控制台输出当前看到的counter值。
2. 读写锁(RWMutex)
sync.RWMutex:这是一个读写互斥锁,允许多个读操作并发,但写操作是互斥的。
主要方法有:
RLock():获取读锁,允许其他Goroutine同时获取读锁。RUnlock():释放读锁。Lock():获取写锁,阻塞其他的读锁和写锁请求。Unlock():释放写锁。
2.1 示例
创建多个goroutine,一些用于读取共享数据,而另一些用于写入共享数据。sync.RWMutex将允许多个读操作并发执行,但写操作将是互斥的,确保了数据的一致性。
package main
import (
"fmt"
"sync"
"time"
)
// 定义一个共享资源
var data int = 0
// 创建一个读写互斥锁
var rwMutex sync.RWMutex
func main() {
// 创建一个等待组,以便等待所有goroutine完成
var wg sync.WaitGroup
// 启动多个读goroutine
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
readData(id)
}(i)
}
// 启动多个写goroutine
for i := 0; i < 2; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
writeData(id)
}(i)
}
// 等待所有goroutine完成
wg.Wait()
}
// 读取数据的函数
func readData(id int) {
for j := 0; j < 5; j++ {
// 获取读锁
rwMutex.RLock()
fmt.Printf("Goroutine %d reads data: %d\n", id, data)
// 释放读锁
rwMutex.RUnlock()
// 等待一段时间,模拟读取过程
time.Sleep(time.Millisecond * 100)
}
}
// 写入数据的函数
func writeData(id int) {
for j := 0; j < 5; j++ {
// 获取写锁
rwMutex.Lock()
// 修改数据
data += id
fmt.Printf("Goroutine %d writes data: %d\n", id, data)
// 释放写锁
rwMutex.Unlock()
// 等待一段时间,模拟写入过程
time.Sleep(time.Millisecond * 100)
}
}
2.2 代码解释
- 共享资源:这里的共享资源是
data变量,所有读goroutine都会读取它,而写goroutine会修改它。 - 读写互斥锁:使用
sync.RWMutex来保护对data的访问。读goroutine在读取前调用rwMutex.RLock()获取读锁,并在读取后调用rwMutex.RUnlock()释放读锁。写goroutine在写入前调用rwMutex.Lock()获取写锁,并在写入后调用rwMutex.Unlock()释放写锁。 - 等待组:使用
sync.WaitGroup来等待所有goroutine完成。每启动一个goroutine,调用wg.Add(1),每个goroutine完成时调用wg.Done()。 - 并发执行:通过
go func(id int)启动goroutine,读goroutine和写goroutine分别执行读取和写入操作。
3. 条件变量(Cond)
sync.Cond:条件变量通常与互斥锁一起使用,用于实现更复杂的同步场景。
它提供了三种方法:
Wait():等待条件满足。Signal():唤醒一个等待中的Goroutine。Broadcast():唤醒所有等待中的Goroutine。
3.1 示例
创建一个生产者-消费者模型,生产者将数据添加到缓冲区中,而消费者从缓冲区中获取数据。我们使用sync.Cond来实现生产者和消费者之间的同步。
package main
import (
"fmt"
"sync"
"time"
)
// 缓冲区容量
const bufferSize = 5
// 缓冲区
var buffer = make([]int, 0, bufferSize)
// 互斥锁
var mutex sync.Mutex
// 条件变量
var cond = sync.NewCond(&mutex)
func main() {
// 创建一个等待组,以便等待所有goroutine完成
var wg sync.WaitGroup
// 启动生产者goroutine
for i := 0; i < 2; i++ {
wg.Add(1)
go producer(&wg, i)
}
// 启动消费者goroutine
for i := 0; i < 3; i++ {
wg.Add(1)
go consumer(&wg, i)
}
// 等待所有goroutine完成
wg.Wait()
}
// 生产者函数
func producer(wg *sync.WaitGroup, id int) {
defer wg.Done()
for j := 0; j < 10; j++ {
time.Sleep(time.Millisecond * 100) // 模拟生产过程
// 获取锁
mutex.Lock()
// 等待缓冲区未满
for len(buffer) == bufferSize {
cond.Wait()
}
// 生产数据
buffer = append(buffer, j)
fmt.Printf("Producer %d produced: %d, buffer: %v\n", id, j, buffer)
// 唤醒消费者
cond.Signal()
// 释放锁
mutex.Unlock()
}
}
// 消费者函数
func consumer(wg *sync.WaitGroup, id int) {
defer wg.Done()
for {
time.Sleep(time.Millisecond * 150) // 模拟消费过程
// 获取锁
mutex.Lock()
// 等待缓冲区非空
for len(buffer) == 0 {
cond.Wait()
}
// 消费数据
data := buffer[0]
buffer = buffer[1:]
fmt.Printf("Consumer %d consumed: %d, buffer: %v\n", id, data, buffer)
// 唤醒生产者
cond.Signal()
// 释放锁
mutex.Unlock()
}
}
3.2 代码解释
- 缓冲区:
buffer是一个用于存放数据的切片,bufferSize定义了缓冲区的容量。 - 互斥锁:
mutex用于保护缓冲区的并发访问。 - 条件变量:
cond是一个条件变量,配合互斥锁使用,用于实现生产者和消费者之间的同步。 - 生产者函数:
producer函数模拟生产数据。当缓冲区满时,生产者会等待条件变量。生产数据后,生产者会发出Signal通知消费者。 - 消费者函数:
consumer函数模拟消费数据。当缓冲区空时,消费者会等待条件变量。消费数据后,消费者会发出Signal通知生产者。 - 等待组:使用
sync.WaitGroup来等待所有生产者和消费者goroutine完成。
4. Once
sync.Once:保证某个操作只执行一次,常用于初始化操作。
主要方法是:
Do(f func()):只执行一次传入的函数,即使从多个Goroutine调用也只会执行一次。
4.1 示例
模拟一个只需初始化一次的资源。无论有多少个goroutine尝试初始化这个资源,sync.Once都确保它们中的某一个只会执行一次初始化操作。
package main
import (
"fmt"
"sync"
"time"
)
// 定义一个全局变量用于存放初始化资源
var resource string
// 定义一个sync.Once变量
var once sync.Once
// 模拟资源初始化的函数
func initialize() {
fmt.Println("Initializing resource...")
resource = "Resource Initialized"
}
func main() {
// 创建一个等待组,以便等待所有goroutine完成
var wg sync.WaitGroup
// 启动多个goroutine,尝试初始化资源
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
useResource(id)
}(i)
}
// 等待所有goroutine完成
wg.Wait()
// 最后打印资源的状态
fmt.Println("Final resource state:", resource)
}
// 使用资源的函数,尝试初始化资源
func useResource(id int) {
// 使用sync.Once的Do方法确保initialize函数只执行一次
once.Do(initialize)
fmt.Printf("Goroutine %d using resource: %s\n", id, resource)
// 模拟资源使用过程
time.Sleep(time.Millisecond * 100)
}
4.2 代码解释
- 全局变量:
resource是一个全局变量,用于存放初始化的资源。 - sync.Once:
once是一个sync.Once变量,用于确保初始化函数initialize只执行一次。 - 初始化函数:
initialize函数模拟初始化资源的操作,只会在第一次调用时执行。 - 等待组:使用
sync.WaitGroup等待所有goroutine完成操作。 - 使用资源的函数:
useResource函数模拟使用资源的过程。它调用once.Do(initialize)确保initialize函数只执行一次。然后,它打印出资源的状态,并模拟使用资源的过程。
5. 原子操作
sync/atomic包提供了底层的原子操作,可以用于实现无锁的并发算法。
这些操作包括:
AddInt32()AddInt64()LoadInt32()LoadInt64()StoreInt32()StoreInt64()CompareAndSwapInt32()CompareAndSwapInt64()
5.1 示例
创建一个简单的程序,该程序使用原子操作来增加、存储和加载一个整数值,并使用CompareAndSwap来实现条件更新。
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var wg sync.WaitGroup
var count int32 = 0
// 启动多个goroutine来增加计数器
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 5; j++ {
// 使用atomic.AddInt32来原子地增加计数器
atomic.AddInt32(&count, 1)
}
}()
}
// 等待所有goroutine完成
wg.Wait()
// 使用atomic.LoadInt32来原子地读取计数器
finalCount := atomic.LoadInt32(&count)
fmt.Println("Final count:", finalCount)
// 尝试使用atomic.CompareAndSwapInt32来条件性地更新计数器
if atomic.CompareAndSwapInt32(&count, finalCount, 100) {
fmt.Println("Count was", finalCount, ", updated count to 100")
} else {
fmt.Println("Failed to update count")
}
// 再次读取并打印计数器的值
updatedCount := atomic.LoadInt32(&count)
fmt.Println("Updated count:", updatedCount)
}
5.2 代码解释
- 变量定义:定义一个
int32类型的变量count用于计数。 - 增加计数器:启动多个goroutine,每个goroutine使用
atomic.AddInt32来原子地增加count的值。这保证了在并发环境下,计数的增加操作是安全的。 - 读取计数器:所有goroutine完成后,使用
atomic.LoadInt32原子地读取count的值。这是读取共享变量的安全方式。 - 条件更新:使用
atomic.CompareAndSwapInt32尝试原子地更新count的值。这个函数只有在当前值等于预期值时才会更新,并返回是否成功。 - 打印最终结果:打印最终的计数值和更新后的计数值。
6. Pool
sync.Pool:用于临时对象的缓存,减少垃圾回收的压力。
主要方法包括:
Get():获取一个对象。Put(x interface{}):放回一个对象。
6.1 示例
如何使用对象池来缓存和重用对象,从而减少垃圾回收的压力。
package main
import (
"fmt"
"sync"
"time"
)
// 定义一个结构体类型,用于示例
type MyObject struct {
ID int
}
// 创建一个全局的sync.Pool对象
var objectPool = sync.Pool{
New: func() interface{} {
return &MyObject{}
},
}
func main() {
var wg sync.WaitGroup
// 启动多个goroutine来获取和放回对象
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// 从对象池中获取一个对象
obj := objectPool.Get().(*MyObject)
// 模拟对象的使用
obj.ID = id
fmt.Printf("Goroutine %d using object with ID: %d\n", id, obj.ID)
// 模拟工作延迟
time.Sleep(time.Millisecond * 100)
// 重置对象的状态(可选)
obj.ID = 0
// 将对象放回池中
objectPool.Put(obj)
}(i)
}
// 等待所有goroutine完成
wg.Wait()
// 打印对象池的状态
fmt.Println("All goroutines finished, objects are back in the pool.")
}
6.2 代码解释
- 定义结构体:定义一个
MyObject结构体,用于示例。 - 创建对象池:使用
sync.Pool创建一个全局的对象池objectPool。通过设置New字段指定当对象池为空时如何创建新对象。 - 启动多个goroutine:在主函数中,启动10个goroutine,每个goroutine从对象池中获取一个对象,使用后将其放回池中。
- 获取对象:使用
objectPool.Get()从对象池中获取一个对象,并类型断言为*MyObject。 - 使用对象:模拟对象的使用过程,设置对象的
ID字段,并打印信息。 - 模拟延迟:使用
time.Sleep模拟一些处理延迟。 - 重置对象状态:重置对象的状态(这是可选的,但有助于避免状态污染)。
- 放回对象:使用
objectPool.Put(obj)将对象放回对象池中。 - 等待所有goroutine完成:使用
sync.WaitGroup等待所有goroutine完成。 - 打印状态:最后打印消息,表示所有goroutine已完成。
到此这篇关于浅谈Go用于同步和并发控制的几种常见锁的文章就介绍到这了,更多相关Go 同步和并发控制锁内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
