Go+Redis分布式锁
2022年11月23日
注意点
Redis怎么做分布式锁是老生常谈,但是之前没有注意一些细节,存在小隐患。
- 可重入。同线程允许多次加锁。 这个实现需要在
setnx
前先获取对比判断,使用脚本原子性执行,先通过key获取,如果匹配id则获取到锁,如果没有正常加锁setnx
。 - 误删问题。超时是不可预知的,试想,当A获取到锁,进行业务逻辑,再解锁,但是业务逻辑超时了,锁已经自动解除,并且被B获得,那么A解锁就会把B的锁删掉。 所以,需要一个id来保证只解除自己的锁。
代码
package redis
import (
"context"
"math/rand"
"strconv"
"time"
"github.com/redis/go-redis/v9"
)
var (
// 锁自动释放时间
maxTimeout = strconv.Itoa(int(time.Second * 60))
// 重试间隔
retryInterval = 10 * time.Millisecond
// 随机数
rd = rand.New(rand.NewSource(time.Now().UnixNano()))
)
type Mutex struct {
ctx context.Context
key string
id string
}
func NewMutex(ctx context.Context, key string) *Mutex {
return &Mutex{
ctx: ctx,
key: key,
id: strconv.Itoa(int(rd.Int63())),
}
}
// 先判断是否为当前线程重复获取锁,如果是则返回OK。 (可重入锁)
// 否则尝试 SetNX 获取,成功都是返回 OK
const lockScript = `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return "OK"
else
return redis.call("SET", KEYS[1], ARGV[1], "NX", "PX", ARGV[2])
end`
func (m *Mutex) Lock() error {
keys := []string{m.key}
for {
resp, err := rdb.Eval(m.ctx, lockScript, keys, m.id, maxTimeout).Result()
if err != nil && err != redis.Nil {
return err
}
reply, ok := resp.(string)
if ok && reply == "OK" {
return nil
}
time.Sleep(retryInterval)
}
}
// 删除脚本。须匹配id,防止超时后另外线程获取到锁后误删。
const unLockScript = `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end`
func (m *Mutex) UnLock() {
rdb.Eval(m.ctx, unLockScript, []string{m.key}, m.id)
}
测试
测试需要注意的点是,因为做了可重入锁,所以在一个服务器内 NewMutex
是唯一的。
要模拟分布式,需要在每个goroutine
里都重新生成Mutex
,这样他们之间就是 key
相同但是 id
不同。
func BenchmarkMutex(b *testing.B) {
wg := &sync.WaitGroup{}
wg.Add(b.N)
start := time.Now()
for i := 0; i < b.N; i++ {
go func(i int) {
defer wg.Done()
mutex := NewMutex(context.Background(), "test-mutex")
if err := mutex.Lock(); err != nil {
panic(err)
}
defer mutex.UnLock()
time.Sleep(time.Millisecond * 2)
}(i)
}
wg.Wait()
fmt.Println("尝试次数", b.N, "耗时", time.Since(start).Milliseconds(), "ms")
}