Redis 中的分布式锁如何使用分布式锁
为了保证我们在线服务的并发性和安全性,我们的服务一般会摒弃单一应用,使用两者都是高度可扩展的分布式架构。
对可变共享资源的访问只能由一个线程或进程同时访问。这时候,我们需要做一个标记。如果当前有线程或进程在操作共享变量,我们做一个标记来标记当前资源正在被操作,其他线程或进程不能操作。当前操作完成后,删除该标记,以便其他线程或进程可以申请对共享变量的操作。以上标记确保共享变量只能由一个线程或同时持有。
使用Redis实现分布式锁
说说如何使用Redis实现分布式锁
Redis中的分布式锁一般使用set key value px nx或者SETNX+Lua来实现。
因为SETNX命令需要设置过期时间,所以Redis中单个命令的执行是原子性的,组合命令需要使用Lua来保证原子性。
看看如何实现
使用set key value px nx来实现
因为这个命令可以同时设置key值和过期时间,而且Redis中的单个命令都是原子的,所以加锁的时候可以用这个命令。
func (r *Redis) TryLock(ctx context.Context, key, value string, expire time.Duration) (isGetLock bool, err error) {
// 使用 set nx
res, err := r.Do(ctx, "set", key, value, "px", expire.Milliseconds(), "nx").Result()
if err != nil {
return false, err
}
if res == "OK" {
return true, nil
}
return false, nil
}
SETNX+Lua 实现
如果使用SETNX命令,该命令不能设置过期时间,需要配合命令使用。
因为使用了两个命令,所以两个命令的组合不能保证原子性。在一些并发量比较大的情况下,需要使用Lua脚本来保证命令的原子性。
func tryLockScript() string {
script := `
local key = KEYS[1]
local value = ARGV[1]
local expireTime = ARGV[2]
local isSuccess = redis.call('SETNX', key, value)
if isSuccess == 1 then
redis.call('EXPIRE', key, expireTime)
return "OK"
end
return "unLock" `
return script
}
func (r *Redis) TryLock(ctx context.Context, key, value string, expire time.Duration) (isGetLock bool, err error) {
// 使用 Lua + SETNX
res, err := r.Eval(ctx, tryLockScript(), []string{key}, value, expire.Seconds()).Result()
if err != nil {
return false, err
}
if res == "OK" {
return true, nil
}
return false, nil
}
除了上面两个锁命令的区别外,还需要注意解锁时不要误删除其他线程持有的锁。
p>
为什么会这样?我们这里来分析一下
吃个栗子
1、线程1获得锁,锁过期时间为1s;
2、线程1完成业务操作,耗时1.5s。此时线程1的锁已经到了过期时间自动释放,该锁已经被其他线程获取;
3、但是线程1不知道,然后释放锁。这时候会误释放其他线程的锁。
面对这种情况,其实很好处理
1、设置值唯一;
p>
2、每次删除锁,首先判断value的值是否正确。如果不一样,说明锁已经被另一个线程获取了;
看代码实现
var UnLockErr = errors.New("未解锁成功")
func unLockScript() string {
script := `
local value = ARGV[1]
local key = KEYS[1]
local keyValue = redis.call('GET', key)
if tostring(keyValue) == tostring(value) then
return redis.call('DEL', key)
else
return 0
end
`
return script
}
func (r *Redis) Unlock(ctx context.Context, key, value string) (bool, error) {
res, err := r.Eval(ctx, unLockScript(), []string{key}, value).Result()
if err != nil {
return false, err
}
return res.(int64) != 0, nil
}
代码可以参考锁
上面这种锁最大的缺点就是只对一个节点起作用,即使Redis保证高可用,如果节点因为某种原因释放主从切换,锁会丢失:
1、在Redis节点上获得锁;
2、但是锁定的key还没有同步到从节点;
3、失败,发生故障转移,从节点升级为节点;
4、导致锁丢失。
这种情况怎么处理,我们来说说算法
使用和实现分布式锁
在Redis的分布式环境中,我们假设有N个Redis。这些节点之间是完全独立的,不存在主从复制或其他集群协调机制。我们确保使用与 Redis 单实例下相同的方法在 N 个实例上获取和释放锁。现在假设有 5 个 Redis 节点,我们需要在 5 个服务器上运行这些 Redis 实例,这样它们就不会同时全部宕机。
要获得锁,客户端应该执行以下操作:
1、获取当前的 Unix 时间,以毫秒为单位。
2、依次尝试从 5 个实例获取锁,使用相同的键和唯一值(例如 UUID)。在向 Redis 请求锁时,客户端应设置网络连接和响应超时,该超时时间应小于锁的过期时间。例如,如果你的锁在 10 秒后自动过期,那么超时应该在 5-50 毫秒之间。这样可以避免出现服务端Redis已经挂掉,客户端还在等待响应结果的情况。如果服务器在规定时间内没有响应,客户端应该尽快尝试请求另一个Redis实例获取锁;
3、客户端用当前时间减去开始获取锁的时间(步骤1记录时间)得到获取锁的时间。当且仅当从大多数(N/2+1,这里是3个节点)Redis节点获取锁,并且使用的时间小于锁过期时间,才会成功获取锁;
4、如果获取了锁,则key的实际有效时间等于有效时间减去获取锁所用的时间(第三步计算的结果);
5、如果因为某些原因,获取锁失败(至少有N/2+1个Redis实例没有获取锁或者获取锁的时间已经超过有效时间) ,客户端应该解锁所有 Redis 实例(即使某些 Redis 实例根本没有)。如果加锁不成功,会阻止部分节点获取锁,但客户端没有响应,下一段时间不能重新获取锁。
根据官方推荐,该算法在go版本中实现,这里是具体实现过程
项目地址
// LockContext locks m. In case it returns an error on failure, you may retry to acquire the lock by calling this method again.
func (m *Mutex) LockContext(ctx context.Context) error {
if ctx == nil {
ctx = context.Background()
}
value, err := m.genValueFunc()
if err != nil {
return err
}
for i := 0; i < m.tries; i++ {
if i != 0 {
select {
case <-ctx.Done():
// Exit early if the context is done.
return ErrFailed
case = m.quorum && now.Before(until) {
m.value = value
m.until = until
return nil
}
_, err = func() (int, error) {
ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
defer cancel()
return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
// 解锁函数
return m.release(ctx, pool, value)
})
}()
if i == m.tries-1 && err != nil {
return err
}
}
return ErrFailed
}
// 遍历所有的节点,并且在每个节点中执行传入的函数
func (m *Mutex) actOnPoolsAsync(actFn func(redis.Pool) (bool, error)) (int, error) {
type result struct {
Status bool
Err error
}
ch := make(chan result)
// 执行传入的函数
for _, pool := range m.pools {
go func(pool redis.Pool) {
r := result{}
r.Status, r.Err = actFn(pool)
ch <- r
}(pool)
}
n := 0
var err error
// 计算执行成功的节点数目
for range m.pools {
r := <-ch
if r.Status {
n++
} else if r.Err != nil {
err = multierror.Append(err, r.Err)
}
}
return n, err
}
// 手动解锁的lua脚本
var deleteScript = redis.NewScript(1, `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
`)
// 手动解锁
func (m *Mutex) release(ctx context.Context, pool redis.Pool, value string) (bool, error) {
conn, err := pool.Get(ctx)
if err != nil {
return false, err
}
defer conn.Close()
status, err := conn.Eval(deleteScript, m.name, value)
if err != nil {
return false, err
}
return status != int64(0), nil
}
分析想法
1、遍历所有节点,然后尝试在所有节点上进行加锁操作;
2、 收集成功锁定节点的数量,如果没有达到指定数量,则释放刚刚添加的锁;
缺点请参考
怎么做
更新锁
Redis中分布式锁的另一个问题是锁的更新。当锁的到期时间到了,但业务的执行时间还没有完成,就需要更新锁了。租约已续订
续租流程
1、客户端成功锁定后,可以启动一个定时任务,每隔一段时间,检查一下业务是否完成,未完成,增加key的过期时间;
2、这里判断业务是否完成的依据是:
看续租的执行情况
// Extend resets the mutex's expiry and returns the status of expiry extension.
func (m *Mutex) Extend() (bool, error) {
return m.ExtendContext(nil)
}
// ExtendContext resets the mutex's expiry and returns the status of expiry extension.
func (m *Mutex) ExtendContext(ctx context.Context) (bool, error) {
start := time.Now()
// 尝试在所有的节点中加锁
n, err := m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
return m.touch(ctx, pool, m.value, int(m.expiry/time.Millisecond))
})
if n < m.quorum {
return false, err
}
// 判断下锁的过期时间
now := time.Now()
until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor)))
if now.Before(until) {
m.until = until
return true, nil
}
return false, ErrExtendFailed
}
var touchScript = redis.NewScript(1, `
// 需要先比较下当前的value值
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("PEXPIRE", KEYS[1], ARGV[2])
else
return 0
end
`)
func (m *Mutex) touch(ctx context.Context, pool redis.Pool, value string, expiry int) (bool, error) {
conn, err := pool.Get(ctx)
if err != nil {
return false, err
}
defer conn.Close()
status, err := conn.Eval(touchScript, m.name, value, expiry)
if err != nil {
return false, err
}
return status != int64(0), nil
}
1、锁的更新需要客户端监控操作,启动一个定时器,在固定的时间调用更新函数来更新锁;
2、每次续租时都需要匹配当前值,因为锁可能已经被当前线程释放,当前持有者可能是另一个线程;
p>
看一下SETEX的源码
SETEX可以保证只在key不存在的情况下才设置key的值,那么我们看看源码中是如何实现的
// https://github.com/redis/redis/blob/7.0/src/t_string.c#L78
// setGenericCommand()函数是以下命令: SET, SETEX, PSETEX, SETNX.的最底层实现
void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
...
found = (lookupKeyWrite(c->db,key) != NULL);
// 这里是 SETEX 实现的重点
// 如果nx,并且在数据库中找到了这个值就返回
// 如果是 xx,并且在数据库中没有找到键值就会返回
// 因为 Redis 中的命令执行都是单线程操作的
// 所以命令中判断如果存在就返回,能够保证正确性,不会出现并发访问的问题
if ((flags & OBJ_SET_NX && found) ||
(flags & OBJ_SET_XX && !found))
{
if (!(flags & OBJ_SET_GET)) {
addReply(c, abort_reply ? abort_reply : shared.null[c->resp]);
}
return;
}
...
}
1、该命令的实现增加了key值存在的判断,保证NX只在key不存在的时候设置key的值;
2、因为Redis中总是有一个线程来处理命令的执行,所以单个命令可以保证原子性,不会出现并发问题。
为什么Redis可以用于分布式锁
分布式锁需要满足的特性
那么Redis是如何支持上述特性的呢?
1、Redis中命令的执行是单线程的。虽然在.0版本中引入了多线程来处理IO任务,但是命令的执行仍然是单线程的。 ;
2、单线程的特性可以保证命令执行不存在并发问题,也可以保证命令执行的原子性;
3、Redis提供SETNX等命令,可以保证同时只有一个请求成功执行,提供互斥保证;
4、Redis还提供了超时释放命令,可以实现锁的超时释放,避免死锁的发生;
5、高可用,在主从切换和数据丢失的情况下,Redis引入了一种算法,保证了Redis中大部分主节点正常运行,锁可以正常运行;
6、Redis本身并没有提供更新锁的操作,但是一些第三方实现实现了。 Redis中锁的更新类似于java实现的,go实现的。当然,自己实现也不难。实现过程参见上文。
一般来说,Redis 提供了对分布式锁的一些特性的支持。使用 Redis 实现分布式锁是一个不错的选择。
如何选择分布式锁
1、如果业务规模小,qps小,使用Redis,etcd实现分布式锁不会有问题。这取决于公司的基础设施。如果你有现成的Redis、etcd,可以直接使用;
2、Redis 存在一定的安全隐患。如果性要求很高,那么Redis可能不适合,etcd可能更适合;
3、如果系统qps很大,但是可以容忍一些错误,那么Redis可能更适合嘛,毕竟etcd或者后面往往都是吞吐比较低,延迟比较高。
总结
1、在分布式场景中,分布式锁的使用是我们经常遇到的场景;
2、使用Redis实现锁是一个不错的选择。 Redis中单个命令的执行是原子性的,组合命令的原子性也可以借助Lua轻松实现;
3、在分布式场景主从切换、数据同步不及时的情况下,引入redis来处理分布式锁;
4、根据描述,它很重,有安全性,但我们可以根据自己的业务场景来判断;
5、需要注意的是,在设置分布式锁的时候,需要设置值的唯一性,并且每次主动删除锁的时候,都需要匹配值的正确性避免意外删除其他线程的锁;
参考
【Redis核心技术与实战】
【Redis设计与实现】
【Redis学习笔记】
[Redis 分布式锁]
[怎么做]
[etcd实现]分布式锁]#分布式锁
[Redis中的原子操作(3)-使用Redis实现分布式锁]原子操作(3)-使用Redis实现分布式)锁/
暂无评论内容