Redis中的原子操作(3)-使用Redis实现分布式锁

Redis 中的分布式锁如何使用分布式锁

为了保证我们在线服务的并发性和安全性,我们的服务一般会摒弃单一应用,使用两者都是高度可扩展的分布式架构。

对可变共享资源的访问只能由一个线程或进程同时访问。这时候,我们需要做一个标记。如果当前有线程或进程在操作共享变量,我们做一个标记来标记当前资源正在被操作,其他线程或进程不能操作。当前操作完成后,删除该标记,以便其他线程或进程可以申请对共享变量的操作。以上标记确保共享变量只能由一个线程或同时持有。

使用Redis实现分布式锁

说说如何使用Redis实现分布式锁

Redis中的分布式锁一般使用set key value px nx或者SETNX+Lua来实现。

因为SETNX命令需要设置过期时间,所以Redis中单个命令的执行是原子性的,组合命令需要使用Lua来保证原子性。

看看如何实现

使用set key value px nx来实现

因为这个命令可以同时设置key值和过期时间,而且Redis中的单个命令都是原子的,所以加锁的时候可以用这个命令。

func (r *Redis) TryLock(ctx context.Context, key, value string, expire time.Duration) (isGetLock bool, err error) {
	// 使用 set nx
	res, err := r.Do(ctx, "set", key, value, "px", expire.Milliseconds(), "nx").Result()
	if err != nil {
		return false, err
	}
	if res == "OK" {
		return true, nil
	}
	return false, nil
}

SETNX+Lua 实现

如果使用SETNX命令,该命令不能设置过期时间,需要配合命令使用。

因为使用了两个命令,所以两个命令的组合不能保证原子性。在一些并发量比较大的情况下,需要使用Lua脚本来保证命令的原子性。

func tryLockScript() string {
	script := `
		local key = KEYS[1]
		local value = ARGV[1] 
		local expireTime = ARGV[2] 
		local isSuccess = redis.call('SETNX', key, value)
		if isSuccess == 1 then
			redis.call('EXPIRE', key, expireTime)
			return "OK"
		end
		return "unLock"    `
	return script
}
func (r *Redis) TryLock(ctx context.Context, key, value string, expire time.Duration) (isGetLock bool, err error) {
	// 使用 Lua + SETNX
	res, err := r.Eval(ctx, tryLockScript(), []string{key}, value, expire.Seconds()).Result()
	if err != nil {
		return false, err
	}
	if res == "OK" {
		return true, nil
	}
	return false, nil
}

除了上面两个锁命令的区别外,还需要注意解锁时不要误删除其他线程持有的锁。

p>

为什么会这样?我们这里来分析一下

吃个栗子

1、线程1获得锁,锁过期时间为1s;

2、线程1完成业务操作,耗时1.5s。此时线程1的锁已经到了过期时间自动释放,该锁已经被其他线程获取;

3、但是线程1不知道,然后释放锁。这时候会误释放其他线程的锁。

面对这种情况,其实很好处理

1、设置值唯一;

p>

2、每次删除锁,首先判断value的值是否正确。如果不一样,说明锁已经被另一个线程获取了;

看代码实现

var UnLockErr = errors.New("未解锁成功")
func unLockScript() string {
	script := `
		local value = ARGV[1] 
		local key = KEYS[1]
		local keyValue = redis.call('GET', key)
		if tostring(keyValue) == tostring(value) then
			return redis.call('DEL', key)
		else
			return 0
		end
    `
	return script
}
func (r *Redis) Unlock(ctx context.Context, key, value string) (bool, error) {
	res, err := r.Eval(ctx, unLockScript(), []string{key}, value).Result()
	if err != nil {
		return false, err
	}
	return res.(int64) != 0, nil
}

代码可以参考锁

上面这种锁最大的缺点就是只对一个节点起作用,即使Redis保证高可用,如果节点因为某种原因释放主从切换,锁会丢失:

1、在Redis节点上获得锁;

2、但是锁定的key还没有同步到从节点;

3、失败,发生故障转移,从节点升级为节点;

4、导致锁丢失。

这种情况怎么处理,我们来说说算法

使用和实现分布式锁

在Redis的分布式环境中,我们假设有N个Redis。这些节点之间是完全独立的,不存在主从复制或其他集群协调机制。我们确保使用与 Redis 单实例下相同的方法在 N 个实例上获取和释放锁。现在假设有 5 个 Redis 节点,我们需要在 5 个服务器上运行这些 Redis 实例,这样它们就不会同时全部宕机。

要获得锁,客户端应该执行以下操作:

1、获取当前的 Unix 时间,以毫秒为单位。

2、依次尝试从 5 个实例获取锁,使用相同的键和唯一值(例如 UUID)。在向 Redis 请求锁时,客户端应设置网络连接和响应超时,该超时时间应小于锁的过期时间。例如,如果你的锁在 10 秒后自动过期,那么超时应该在 5-50 毫秒之间。这样可以避免出现服务端Redis已经挂掉,客户端还在等待响应结果的情况。如果服务器在规定时间内没有响应,客户端应该尽快尝试请求另一个Redis实例获取锁;

3、客户端用当前时间减去开始获取锁的时间(步骤1记录时间)得到获取锁的时间。当且仅当从大多数(N/2+1,这里是3个节点)Redis节点获取锁,并且使用的时间小于锁过期时间,才会成功获取锁;

4、如果获取了锁,则key的实际有效时间等于有效时间减去获取锁所用的时间(第三步计算的结果);

5、如果因为某些原因,获取锁失败(至少有N/2+1个Redis实例没有获取锁或者获取锁的时间已经超过有效时间) ,客户端应该解锁所有 Redis 实例(即使某些 Redis 实例根本没有)。如果加锁不成功,会阻止部分节点获取锁,但客户端没有响应,下一段时间不能重新获取锁。

根据官方推荐,该算法在go版本中实现,这里是具体实现过程

项目地址

// LockContext locks m. In case it returns an error on failure, you may retry to acquire the lock by calling this method again.
func (m *Mutex) LockContext(ctx context.Context) error {
	if ctx == nil {
		ctx = context.Background()
	}
	value, err := m.genValueFunc()
	if err != nil {
		return err
	}
	
	for i := 0; i < m.tries; i++ {
		if i != 0 {
			select {
			case <-ctx.Done():
				// Exit early if the context is done.
				return ErrFailed
			case = m.quorum && now.Before(until) {
			m.value = value
			m.until = until
			return nil
		}
		_, err = func() (int, error) {
			ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
			defer cancel()
			return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
				// 解锁函数
				return m.release(ctx, pool, value)
			})
		}()
		if i == m.tries-1 && err != nil {
			return err
		}
	}
	return ErrFailed
}
// 遍历所有的节点,并且在每个节点中执行传入的函数
func (m *Mutex) actOnPoolsAsync(actFn func(redis.Pool) (bool, error)) (int, error) {
	type result struct {
		Status bool
		Err    error
	}
	ch := make(chan result)
	// 执行传入的函数
	for _, pool := range m.pools {
		go func(pool redis.Pool) {
			r := result{}
			r.Status, r.Err = actFn(pool)
			ch <- r
		}(pool)
	}
	n := 0
	var err error
	// 计算执行成功的节点数目
	for range m.pools {
		r := <-ch
		if r.Status {
			n++
		} else if r.Err != nil {
			err = multierror.Append(err, r.Err)
		}
	}
	return n, err
}
// 手动解锁的lua脚本
var deleteScript = redis.NewScript(1, `
	if redis.call("GET", KEYS[1]) == ARGV[1] then
		return redis.call("DEL", KEYS[1])
	else
		return 0
	end
`)
// 手动解锁
func (m *Mutex) release(ctx context.Context, pool redis.Pool, value string) (bool, error) {
	conn, err := pool.Get(ctx)
	if err != nil {
		return false, err
	}
	defer conn.Close()
	status, err := conn.Eval(deleteScript, m.name, value)
	if err != nil {
		return false, err
	}
	return status != int64(0), nil
}

分析想法

1、遍历所有节点,然后尝试在所有节点上进行加锁操作;

2、 收集成功锁定节点的数量,如果没有达到指定数量,则释放刚刚添加的锁;

缺点请参考

怎么做

更新锁

Redis中分布式锁的另一个问题是锁的更新。当锁的到期时间到了,但业务的执行时间还没有完成,就需要更新锁了。租约已续订

续租流程

1、客户端成功锁定后,可以启动一个定时任务,每隔一段时间,检查一下业务是否完成,未完成,增加key的过期时间;

2、这里判断业务是否完成的依据是:

看续租的执行情况

// Extend resets the mutex's expiry and returns the status of expiry extension.
func (m *Mutex) Extend() (bool, error) {
	return m.ExtendContext(nil)
}
// ExtendContext resets the mutex's expiry and returns the status of expiry extension.
func (m *Mutex) ExtendContext(ctx context.Context) (bool, error) {
	start := time.Now() 
	// 尝试在所有的节点中加锁
	n, err := m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
		return m.touch(ctx, pool, m.value, int(m.expiry/time.Millisecond))
	})
	if n < m.quorum {
		return false, err
	}
	// 判断下锁的过期时间
	now := time.Now()
	until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor)))
	if now.Before(until) {
		m.until = until
		return true, nil
	}
	return false, ErrExtendFailed
}
var touchScript = redis.NewScript(1, `
	// 需要先比较下当前的value值
	if redis.call("GET", KEYS[1]) == ARGV[1] then
		return redis.call("PEXPIRE", KEYS[1], ARGV[2])
	else
		return 0
	end
`)
func (m *Mutex) touch(ctx context.Context, pool redis.Pool, value string, expiry int) (bool, error) {
	conn, err := pool.Get(ctx)
	if err != nil {
		return false, err
	}
	defer conn.Close()
	status, err := conn.Eval(touchScript, m.name, value, expiry)
	if err != nil {
		return false, err
	}
	return status != int64(0), nil
}

1、锁的更新需要客户端监控操作,启动一个定时器,在固定的时间调用更新函数来更新锁;

2、每次续租时都需要匹配当前值,因为锁可能已经被当前线程释放,当前持有者可能是另一个线程;

p>

看一下SETEX的源码

SETEX可以保证只在key不存在的情况下才设置key的值,那么我们看看源码中是如何实现的

// https://github.com/redis/redis/blob/7.0/src/t_string.c#L78
// setGenericCommand()函数是以下命令: SET, SETEX, PSETEX, SETNX.的最底层实现
void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
    ...
    found = (lookupKeyWrite(c->db,key) != NULL);
    // 这里是 SETEX 实现的重点
	// 如果nx,并且在数据库中找到了这个值就返回
	// 如果是 xx,并且在数据库中没有找到键值就会返回
	
	// 因为 Redis 中的命令执行都是单线程操作的
	// 所以命令中判断如果存在就返回,能够保证正确性,不会出现并发访问的问题
    if ((flags & OBJ_SET_NX && found) ||
        (flags & OBJ_SET_XX && !found))
    {
        if (!(flags & OBJ_SET_GET)) {
            addReply(c, abort_reply ? abort_reply : shared.null[c->resp]);
        }
        return;
    }
    ...
}

1、该命令的实现增加了key值存在的判断,保证NX只在key不存在的时候设置key的值;

2、因为Redis中总是有一个线程来处理命令的执行,所以单个命令可以保证原子性,不会出现并发问题。

为什么Redis可以用于分布式锁

分布式锁需要满足的特性

那么Redis是如何支持上述特性的呢?

1、Redis中命令的执行是单线程的。虽然在.0版本中引入了多线程来处理IO任务,但是命令的执行仍然是单线程的。 ;

2、单线程的特性可以保证命令执行不存在并发问题,也可以保证命令执行的原子性;

3、Redis提供SETNX等命令,可以保证同时只有一个请求成功执行,提供互斥保证;

4、Redis还提供了超时释放命令,可以实现锁的超时释放,避免死锁的发生;

5、高可用,在主从切换和数据丢失的情况下,Redis引入了一种算法,保证了Redis中大部分主节点正常运行,锁可以正常运行;

6、Redis本身并没有提供更新锁的操作,但是一些第三方实现实现了。 Redis中锁的更新类似于java实现的,go实现的。当然,自己实现也不难。实现过程参见上文。

一般来说,Redis 提供了对分布式锁的一些特性的支持。使用 Redis 实现分布式锁是一个不错的选择。

如何选择分布式锁

1、如果业务规模小,qps小,使用Redis,etcd实现分布式锁不会有问题。这取决于公司的基础设施。如果你有现成的Redis、etcd,可以直接使用;

2、Redis 存在一定的安全隐患。如果性要求很高,那么Redis可能不适合,etcd可能更适合;

3、如果系统qps很大,但是可以容忍一些错误,那么Redis可能更适合嘛,毕竟etcd或者后面往往都是吞吐比较低,延迟比较高。

总结

1、在分布式场景中,分布式锁的使用是我们经常遇到的场景;

2、使用Redis实现锁是一个不错的选择。 Redis中单个命令的执行是原子性的,组合命令的原子性也可以借助Lua轻松实现;

3、在分布式场景主从切换、数据同步不及时的情况下,引入redis来处理分布式锁;

4、根据描述,它很重,有安全性,但我们可以根据自己的业务场景来判断;

5、需要注意的是,在设置分布式锁的时候,需要设置值的唯一性,并且每次主动删除锁的时候,都需要匹配值的正确性避免意外删除其他线程的锁;

参考

【Redis核心技术与实战】

【Redis设计与实现】

【Redis学习笔记】

[Redis 分布式锁]

[怎么做]

[etcd实现]分布式锁]#分布式锁

[Redis中的原子操作(3)-使用Redis实现分布式锁]原子操作(3)-使用Redis实现分布式)锁/

© 版权声明
THE END
喜欢就支持一下吧
点赞50赞赏 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片

    暂无评论内容