Golang

关注公众号 jb51net

关闭
首页 > 脚本专栏 > Golang > Go Zookeeper分布式锁

Golang使用Zookeeper实现分布式锁

作者:awk

分布式锁是一种在分布式系统中用于控制并发访问的机制,ZooKeeper 和 Redis 都是常用的实现分布式锁的工具,本文就来使用Zookeeper实现分布式锁,希望对大家有所帮助

什么是分布式锁

分布式锁是一种在分布式系统中用于控制并发访问的机制。在分布式系统中,多个客户端可能会同时对同一个资源进行访问,这可能导致数据不一致的问题。分布式锁的作用是确保同一时刻只有一个客户端能够对某个资源进行访问,从而避免数据不一致的问题。

分布式锁的实现通常依赖于一些具有分布式特性的技术,如 ZooKeeperRedis、数据库等。这些技术提供了在分布式环境中实现互斥访问的机制,使得多个客户端在竞争同一个资源时能够有序地进行访问。

通过使用分布式锁,可以确保分布式系统中的数据一致性和并发访问的有序性,从而提高系统的可靠性和稳定性。

Zookeeper 与 Redis 的分布式锁对比

ZooKeeper 和 Redis 都是常用的实现分布式锁的工具,但它们在实现方式、特性、适用场景等方面有一些区别。以下是 ZooKeeper 分布式锁与 Redis 分布式锁的比较:

实现方式

特性

适用场景

可靠性

综上所述,ZooKeeper 分布式锁和 Redis 分布式锁各有优缺点,具体选择哪种方式取决于实际业务场景和需求。在需要保证顺序性和公平性的场景下,ZooKeeper 分布式锁可能更适合;而在需要高性能和快速响应的场景下,Redis 分布式锁可能更合适。

为什么 Zookeeper 可以实现分布式锁

ZooKeeper 可以实现分布式锁,主要得益于其以下几个特性:

基于以上特性,ZooKeeper 可以实现分布式锁。具体实现流程如下:

因此,ZooKeeper 通过其临时节点、顺序节点和 Watcher 机制等特性,实现了分布式锁的功能。

使用 Golang 实现 Zookeeper 分布式锁

下面我们通过一个简单的例子来演示如何使用 Golang 实现 ZooKeeper 分布式锁。

创建 zookeeper 客户端连接

import "github.com/go-zookeeper/zk"

func client() *zk.Conn {
    // 默认端口 2181
 c, _, err := zk.Connect([]string{"192.168.2.168"}, time.Second)
 if err != nil {
  panic(err)
 }
 return c
}

创建父节点 - /lock

我们可以在获取锁之前,先创建一个父节点,用于存放锁节点。

type Lock struct {
 c *zk.Conn
}

// 父节点 /lock 不存在的时候进行创建
func NewLock() *Lock {
 c := client()
 e, _, err := c.Exists("/lock")
 if err != nil {
  panic(err)
 }
 if !e {
  _, err := c.Create("/lock", []byte(""), 0, zk.WorldACL(zk.PermAll))
  if err != nil {
   panic(err)
  }
 }

 return &Lock{c: c}
}

获取锁

在 Zookeeper 分布式锁实现中,获取锁的过程实际上就是创建一个临时顺序节点,并判断自己是否是所有临时顺序节点中序号最小的。

获取锁的关键是:

具体创建代码如下:

p, err := l.c.Create("/lock/lock", []byte(""), zk.FlagEphemeral|zk.FlagSequence, zk.WorldACL(zk.PermAll))

其中 zk.FlagEphemeral 表示创建的是临时节点,zk.FlagSequence 表示创建的是顺序节点。

判断当前创建的节点是否是最小节点

具体步骤如下:

childs, _, err := l.c.Children("/lock")
if err != nil {
    return "", err
}

// childs 是无序的,所以需要排序,以便找到当前节点的前一个节点,然后监听前一个节点
sort.Strings(childs)

// 成功获取到锁
p1 := strings.Replace(p, "/lock/", "", 1)
if childs[0] == p1 {
    return p, nil
}

不是最小节点,监听前一个节点

具体步骤如下:

// 监听锁,等待锁释放
// 也就是说,如果当前节点不是最小的节点,那么就监听前一个节点
// 一旦前一个节点被删除,那么就可以获取到锁
index := sort.SearchStrings(childs, p1)
b, _, ev, err := l.c.ExistsW("/lock/" + childs[index-1])
if err != nil {
    return "", err
}

// 在调用 ExistsW 之后,前一个节点已经被删除
if !b {
    return p, nil
}

// 等待前一个节点被删除
<-ev

return p, nil

在调用 ExistsW 的时候,如果前一个节点已经被删除,那么 ExistsW 会立即返回 false,否则我们可以通过 ExistsW 返回的第三个参数 ev 来等待前一个节点被删除。

在 <-ev 处,我们通过 <-ev 来等待前一个节点被删除,一旦前一个节点被删除,ev 会收到一个事件,这个时候我们就可以获取到锁了。

释放锁

如果调用 Lock 可以成功获取到锁,我们会返回当前创建的节点的路径,我们可以通过这个路径来释放锁。

func (l *Lock) Unlock(p string) error {
 return l.c.Delete(p, -1)
}

完整代码

package main

import (
 "github.com/go-zookeeper/zk"
 "sort"
 "strings"
 "time"
)

func client() *zk.Conn {
 c, _, err := zk.Connect([]string{"192.168.2.168"}, time.Second) //*10)
 if err != nil {
  panic(err)
 }
 return c
}

type Lock struct {
 c *zk.Conn
}

func NewLock() *Lock {
 c := client()
 e, _, err := c.Exists("/lock")
 if err != nil {
  panic(err)
 }
 if !e {
  _, err := c.Create("/lock", []byte(""), 0, zk.WorldACL(zk.PermAll))
  if err != nil {
   panic(err)
  }
 }

 return &Lock{c: c}
}

func (l *Lock) Lock() (string, error) {
 p, err := l.c.Create("/lock/lock", []byte(""), zk.FlagEphemeral|zk.FlagSequence, zk.WorldACL(zk.PermAll))
 if err != nil {
  return "", err
 }
 childs, _, err := l.c.Children("/lock")
 if err != nil {
  return "", err
 }

 // childs 是无序的,所以需要排序,以便找到当前节点的前一个节点,然后监听前一个节点
 sort.Strings(childs)

 // 成功获取到锁
 p1 := strings.Replace(p, "/lock/", "", 1)
 if childs[0] == p1 {
  return p, nil
 }

 // 监听锁,等待锁释放
 // 也就是说,如果当前节点不是最小的节点,那么就监听前一个节点
 // 一旦前一个节点被删除,那么就可以获取到锁
 index := sort.SearchStrings(childs, p1)
 b, _, ev, err := l.c.ExistsW("/lock/" + childs[index-1])
 if err != nil {
  return "", err
 }

 // 在调用 ExistsW 之后,前一个节点已经被删除
 if !b {
  return p, nil
 }

 // 等待前一个节点被删除
 <-ev

 return p, nil
}

func (l *Lock) Unlock(p string) error {
 return l.c.Delete(p, -1)
}

测试代码

下面这个例子模拟了分布式的 counter 操作,我们通过 ZooKeeper 分布式锁来保证 counter 的原子性。

当然这个例子只是为了说明 ZooKeeper 分布式锁的使用,实际上下面的功能通过 redis 自身提供的 incr 就可以实现,不需要这么复杂。

package main

import (
 "context"
 "fmt"
 "github.com/redis/go-redis/v9"
 "sync"
)

func main() {
 var count = 1000
 var wg sync.WaitGroup
 wg.Add(count)

 l := NewLock()
    // 创建 redis 客户端连接
 redisClient = redis.NewClient(&redis.Options{
  Addr:     "192.168.2.168:6379",
  Password: "", // no password set
  DB:       0,  // use default DB
 })

 for i := 0; i < count; i++ {
  go func(i1 int) {
   defer wg.Done()

    // 获取 Zookeeper 分布式锁
   p, err := l.Lock()
   if err != nil {
    return
   }
   // 成功获取到了分布式锁:
   // 1. 从 redis 获取 zk_counter 的值
   // 2. 然后对 zk_counter 进行 +1 操作
   // 3. 最后将 zk_counter 的值写回 redis
   cmd := redisClient.Get(context.Background(), "zk_counter")
   i2, _ := cmd.Int()
   i2++
   redisClient.Set(context.Background(), "zk_counter", i2, 0)
   // 释放分布式锁
   err = l.Unlock(p)
   if err != nil {
    println(fmt.Errorf("unlock error: %v", err))
    return
   }
  }(i)
 }

 wg.Wait()

 l.c.Close()
}

我们需要将测试程序放到不同的机器上运行,这样才能模拟分布式环境。

总结

最后,再来回顾一下本文内容:

以上就是Golang使用Zookeeper实现分布式锁的详细内容,更多关于Go Zookeeper分布式锁的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文