一文了解分布式锁

概述

分布式锁的CAP理论

在一个分布式系统,最多只能满足一下三项中的两项

  • 一致性(Consistency):在分布式系统中的所有数据备份,在同一时刻是否同样的值(等同于所有节点访问同一份最新的数据副本)
  • 可用性(Availability):保证每个请求不管成功或者失败都有响应
  • 分区容错性(Partition tolerance):系统中任意信息的丢失或失败不会影响系统的继续运作

分布式锁的产生

当在分布式模型下,数据只有一份(或有限制),此时需要利用锁技术来控制某一时刻修改数据的进程数。这种锁即为分布式锁。

为了保证一个方法或属性在高并发情况下的同一时间只能被同一个线程执行,在传统单体应用单机部署的情况下,可以使用并发处理相关的功能进行互斥控制。但是,随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的应用并不能提供分布式锁的能力。为了解决这个问题就需要一种跨机器的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!

分布式锁应该具备的条件

  • 互斥性:在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行;
  • 高可用的获取锁与释放锁;
  • 高性能的获取锁与释放锁;
  • 可重入性:具备可重入特性,具备锁失效机制,防止死锁,即就算一个客户端持有锁的期间崩溃而没
  • 有主动释放锁,也需要保证后续其他客户端能够加锁成功
  • 非阻塞:具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败

分布式锁的应用场景

  • 商品秒杀
  • 抢优惠券

实现方式

image

基于数据库

悲观锁

创建资源锁表

 CREATE TABLE `resource_lock` (
`id` int(4) NOT NULL AUTO_INCREMENT COMMENT '主键',
`resource_name` varchar(64) NOT NULL DEFAULT '' COMMENT '锁定的资源名',
`owner` varchar(64) NOT NULL DEFAULT '' COMMENT '锁拥有者',
`desc` varchar(1024) NOT NULL DEFAULT '备注信息',
`update_time` timestamp NOT NULL DEFAULT '' COMMENT '保存数据时间,自动生成',
PRIMARY KEY (`id`),
UNIQUE KEY `uidx_resource_name` (`resource_name `) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='锁定中的资源';

resource_name必须有唯一索引

使用事务查询更新

 @Transaction
public void lock(String name) {
ResourceLock rlock = exeSql("select * from resource_lock where resource_name = name for update");
if (rlock == null) {
exeSql("insert into resource_lock(reosurce_name,owner,count) values (name, 'ip',0)");
}
}

使用 for update 锁定的资源。如果执行成功,会立即返回,执行插入数据库,后续再执行一些其他业务逻辑,直到事务提交,执行结束;如果执行失败,就会一直阻塞着。

可以在数据库客户端工具上测试出来这个效果,当在一个终端执行了 for update,不提交事务。在另外的终端上执行相同条件的 for update,会一直卡着

虽然也能实现分布式锁的效果,但是会存在性能瓶颈。

优点:

简单易用,好理解,保障数据强一致性。

缺点:

1)在 RR 事务级别,select 的 for update 操作是基于间隙锁(gap lock) 实现的,是一种悲观锁的实现方式,所以存在阻塞问题

2)高并发情况下,大量请求进来,会导致大部分请求进行排队,影响数据库稳定性,也会耗费服务的CPU等资源

当获得锁的客户端等待时间过长时,会提示:

[40001][1205] Lock wait timeout exceeded; try restarting transaction
1

高并发情况下,也会造成占用过多的应用线程,导致业务无法正常响应。

3)如果优先获得锁的线程因为某些原因,一直没有释放掉锁,可能会导致死锁的发生。

4)锁的长时间不释放,会一直占用数据库连接,可能会将数据库连接池撑爆,影响其他服务。

5)MySql数据库会做查询优化,即便使用了索引,优化时发现全表扫效率更高,则可能会将行锁升级为表锁,此时可能就更悲剧了。

6)不支持可重入特性,并且超时等待时间是全局的,不能随便改动。

乐观锁

基于CAS,添加时间戳或版本号

update xx set version=new_version where xx=yy and version=Old_version,通过增加递增的版本号字段实现乐观锁。

不具有互斥性,不会产生锁等待而消耗资源,操作过程中认为不存在并发冲突,只有update version失败后才能觉察到。

抢购、秒杀就是用了这种实现以防止超卖。

创建资源锁表

CREATE TABLE `resource` (
`id` int(4) NOT NULL AUTO_INCREMENT COMMENT '主键',
`resource_name` varchar(64) NOT NULL DEFAULT '' COMMENT '资源名',
`share` varchar(64) NOT NULL DEFAULT '' COMMENT '状态',
`version` int(4) NOT NULL DEFAULT '' COMMENT '版本号',
`desc` varchar(1024) NOT NULL DEFAULT '备注信息',
`update_time` timestamp NOT NULL DEFAULT '' COMMENT '保存数据时间,自动生成',
PRIMARY KEY (`id`),
UNIQUE KEY `uidx_resource_name` (`resource_name `) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='资源';

添加时间戳伪代码实现

 Resrouce resource = exeSql("select * from resource where resource_name = xxx");
boolean succ = exeSql("update resource set version= 'newVersion' ... where resource_name = xxx and version = 'oldVersion'");

if (!succ) {
    // 发起重试
}

优点:

  • 实现简单,复杂度低
  • 保障数据一致性

缺点:

  • 性能低,并且有锁表的风险
  • 可靠性差
  • 非阻塞操作失败后,需要轮询,占用CPU资源
  • 长时间不commit或者是长时间轮询,可能会占用较多的连接资源

基于Zookeeper实现

实现思想

  1. 创建一个目录mylock
  2. 线程A获取锁就在mylock里创建一个顺序临时节点
  3. 查看所有子节点,查看是否有比自己小的兄弟节点,如果没有,就证明自己是最小的线程,
    可以获取锁
  4. 线程B获取所有子节点,判断自己是不是最小的,监听比自己次小的子节点
  5. 线程A处理完,B监听到A结束,判断自己是不是最小的,是获取锁

image

实现使用直接使用Apache的开源库Curator,它是一个ZooKeeper客户端,Curator提供的InterProcessMutex是分布式锁的实现,acquire方法用于获取锁,release方法用于释放锁。

 InterProcessMutex interProcessMutex = new InterProcessMutex(client,"/anyLock"); 
interProcessMutex.acquire();
interProcessMutex.release();

分布式锁核心源码

 private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception 
{
boolean haveTheLock = false;
boolean doDelete = false;
try {
if ( revocable.get() != null ) {
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}

while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) {
// 获取当前所有节点排序后的集合
List<String> children = getSortedChildren();
// 获取当前节点的名称
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
// 判断当前节点是否是最小的节点
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() ) {
// 获取到锁
haveTheLock = true;
} else {
// 没获取到锁,对当前节点的上一个节点注册一个监听器
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this){
Stat stat = client.checkExists().usingWatcher(watcher).forPath(previousSequencePath);
if ( stat != null ){
if ( millisToWait != null ){
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 ){
doDelete = true; // timed out - delete our node
break;
}
wait(millisToWait);
}else{
wait();
}
}
}
// else it may have been deleted (i.e. lock released). Try to acquire again
}
}
}
catch ( Exception e ) {
doDelete = true;
throw e;
} finally{
if ( doDelete ){
deleteOurPath(ourPath);
}
}
return haveTheLock;
}

Curator实现原理

image

优点:

  • 可靠性非常高
  • 性能较好
  • CAP模型属于CP,基于ZAB一致性算法实现

缺点:

  • 性能并不如Redis(主要原因是在写操作,即获取锁释放锁都需要在Leader上执行,然后同步到follower)
  • 实现复杂度高

基于Redis

实现思想

主要是基于命令:SETNX key value

命令官方文档:https://redis.io/commands/setnx

用法可参考:Redis命令参考

image

具体步骤

  1. 获取锁的时候,使用sentx加锁,并用expire命令设置超时时间,value设为一个随机生成的uuid,释放锁的时候检查是否一致
  2. 获取锁的时候,设置一个获取锁的最大时间,超过这个时间获取锁失败
  3. 释放锁的时候通过uuid判断是不是该锁,若是该锁,执行delete释放

优点:

  • 性能非常高
  • 可靠性较高
  • CAP模型属于AP

缺点:

  • 复杂度较高
  • 无一致性算法,可靠性并不如Zookeeper
  • 锁删除失败 过期时间不好控制
  • 非阻塞,获取失败后,需要轮询不断尝试获取锁,比较消耗性能,占用cpu资源

分布式锁对比


Redis分布式锁实现

以减库存例子为例,访问接口时自动减接口库存

方案一

@Service
public class RedisLockDemo {

    @Autowired
    private StringRedisTemplate redisTemplate;

    public String deduceStock() {
        ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();

        //获取redis中的库存
        int stock = Integer.valueOf(valueOperations.get("stock"));
        if (stock > 0) {
            int newStock = stock - 1;
            valueOperations.set("stock", newStock + "");
            System.out.println("扣减库存成功, 剩余库存:" + newStock);
        } else {
            System.out.println("库存已经为0,不能继续扣减");
        }

        return "success";
    }
}
 
  • 先从Redis中读取stock的值,表示商品的库存
  • 判断商品库存是否大于0,如果大于0,则库存减1,然后再保存到Redis里面去,否则就报错

改进:

上述代码没有加锁,会导致商品超卖,在同一时刻多个进程都对库存进行操作,最后结果上只会执行一次减库存

@Service
public class RedisLockDemo {

    @Autowired
    private StringRedisTemplate redisTemplate;

    public String deduceStock() {
        ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();

        synchronized (this) {
            //获取redis中的库存
            int stock = Integer.valueOf(valueOperations.get("stock"));
            if (stock > 0) {
                int newStock = stock - 1;
                valueOperations.set("stock", newStock + "");
                System.out.println("扣减库存成功, 剩余库存:" + newStock);
            } else {
                System.out.println("库存已经为0,不能继续扣减");
            }
        }

        return "success";
    }
}

再改进:

经过上述改进后,在单体环境下运行没有问题了,但是当不同主机来访问时,仍会发生超卖问题,

所以就需要分布式锁


方案二

分布式锁简单实现

image

代码实现:

@Service
public class RedisLockDemo {
    @Autowired
    private StringRedisTemplate redisTemplate;

    public String deduceStock() {
        ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
        String lockKey = "product_001";

        //加锁: setnx
        Boolean isSuccess = valueOperations.setIfAbsent(lockKey, "1");
        if(null == isSuccess || isSuccess) {
            System.out.println("服务器繁忙, 请稍后重试");
            return "error";
        }

        //------ 执行业务逻辑 ----start------
        int stock = Integer.valueOf(valueOperations.get("stock"));
        if (stock > 0) {
            int newStock = stock - 1;
            //执行业务操作减库存
            valueOperations.set("stock", newStock + "");
            System.out.println("扣减库存成功, 剩余库存:" + newStock);
        } else {
            System.out.println("库存已经为0,不能继续扣减");
        }
        //------ 执行业务逻辑 ----end------

        //释放锁
        redisTemplate.delete(lockKey);
        return "success";
    }
}
 

其实就是对每一个商品加一把锁,代码里面是product_001

  • 使用setnx对商品进行加锁
  • 如成功说明加锁成功,如失败说明有其他请求抢占了该商品的锁,则当前请求失败退出
  • 加锁成功之后进行扣减库存操作
  • 删除商品锁

改进1:

加锁成功之后,扣减库存的逻辑可能抛异常了,即并不会执行到释放锁的逻辑,那么该商品锁是一直没有释放,会成为死锁的,其他请求完全无法扣减该商品的

使用try…catch…finally防止死锁问题

 @Service
public class RedisLockDemo {
    @Autowired
    private StringRedisTemplate redisTemplate;

    public String deduceStock() {
        ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
        String lockKey = "product_001";

        try {
            //加锁: setnx
            Boolean isSuccess = valueOperations.setIfAbsent(lockKey, "1");
            if(null == isSuccess || isSuccess) {
                System.out.println("服务器繁忙, 请稍后重试");
                return "error";
            }

            //------ 执行业务逻辑 ----start------
            int stock = Integer.valueOf(valueOperations.get("stock"));
            if (stock > 0) {
                int newStock = stock - 1;
                //执行业务操作减库存
                valueOperations.set("stock", newStock + "");
                System.out.println("扣减库存成功, 剩余库存:" + newStock);
            } else {
                System.out.println("库存已经为0,不能继续扣减");
            }
            //------ 执行业务逻辑 ----end------
        } finally {
            //释放锁
            redisTemplate.delete(lockKey);
        }

        return "success";
    }
}

改进2:

除了抛异常之外,比如程序崩溃、服务器宕机、服务器重启、请求超时被终止、发布、人为kill等都有可能导致释放锁的逻辑没有执行,比如对商品加分布式锁成功之后,在扣减库存的时候服务器正在执行重启,会导致没有执行释放锁。

可以通过对锁设置超时时间来防止死锁的发生,使用Redis的expire命令可以对key进行设置超时时间

image

代码实现:

 @Service
public class RedisLockDemo {
    @Autowired
    private StringRedisTemplate redisTemplate;

    public String deduceStock() {
        ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
        String lockKey = "product_001";

        try {
            //加锁: setnx
            Boolean isSuccess = valueOperations.setIfAbsent(lockKey, "1");
            //expire增加超时时间
            redisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
            if(null == isSuccess || isSuccess) {
                System.out.println("服务器繁忙, 请稍后重试");
                return "error";
            }

            //------ 执行业务逻辑 ----start------
            int stock = Integer.valueOf(valueOperations.get("stock"));
            if (stock > 0) {
                int newStock = stock - 1;
                //执行业务操作减库存
                valueOperations.set("stock", newStock + "");
                System.out.println("扣减库存成功, 剩余库存:" + newStock);
            } else {
                System.out.println("库存已经为0,不能继续扣减");
            }
            //------ 执行业务逻辑 ----end------
        } finally {
            //释放锁
            redisTemplate.delete(lockKey);
        }

        return "success";
    }
}

改进3:
设置超时时间不是原子操作,那么仍然无法避免死锁问题

image

对此,有两种做法:

lua脚本
set原生命令(Redis 2.6.12版本及以上)

一般是推荐使用set命令,Redis官方在2.6.12版本对set命令增加了NX、EX、PX等参数,即可以将上面的加锁和设置时间放到一条命令上执行,通过set命令即可:

命令官方文档:https://redis.io/commands/set

用法可参考:Redis命令参考

image

代码实现:

@Service
public class RedisLockDemo {
    @Autowired
    private StringRedisTemplate redisTemplate;

    public String deduceStock() {
        ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
        String lockKey = "product_001";

        try {
            //加锁: setnx 和 expire增加超时时间
            Boolean isSuccess = valueOperations.setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);
            if(null == isSuccess || isSuccess) {
                System.out.println("服务器繁忙, 请稍后重试");
                return "error";
            }

            //------ 执行业务逻辑 ----start------
            int stock = Integer.valueOf(valueOperations.get("stock"));
            if (stock > 0) {
                int newStock = stock - 1;
                //执行业务操作减库存
                valueOperations.set("stock", newStock + "");
                System.out.println("扣减库存成功, 剩余库存:" + newStock);
            } else {
                System.out.println("库存已经为0,不能继续扣减");
            }
            //------ 执行业务逻辑 ----end------
        } finally {
            //释放锁
            redisTemplate.delete(lockKey);
        }

        return "success";
    }
}
 

改进4:

在高并发场景下,超时时间设置不合理问题

image

流程:

  • 进程A加锁之后,扣减库存的时间超过设置的超时时间,这里设置的锁是10秒
  • 在第10秒的时候由于时间到期了所以进程A设置的锁被Redis释放了(T5)
  • 刚好进程B请求进来了,加锁成功(T6)
  • 进程A操作完成(扣减库存)之后,把进程B设置的锁给释放了
  • 刚好进程C请求进来了,加锁成功
  • 进程B操作完成之后,也把进程C设置的锁给释放了
  • 以此类推…

解决方法:

  • 加锁的时候,把值设置为唯一值,比如说UUID这种随机数
  • 释放锁的时候,获取锁的值判断value是不是当前进程设置的唯一值,如果是再去删除

image

代码实现:

@Service
public class RedisLockDemo {
    @Autowired
    private StringRedisTemplate redisTemplate;

    public String deduceStock() {
        ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
        String lockKey = "product_001";
        String clientId = UUID.randomUUID().toString();

        try {
            //加锁: setnx 和 expire增加超时时间
            Boolean isSuccess = valueOperations.setIfAbsent(lockKey, clientId, 10, TimeUnit.SECONDS);
            if(null == isSuccess || isSuccess) {
                System.out.println("服务器繁忙, 请稍后重试");
                return "error";
            }

            //------ 执行业务逻辑 ----start------
            int stock = Integer.valueOf(valueOperations.get("stock"));
            if (stock > 0) {
                int newStock = stock - 1;
                //执行业务操作减库存
                valueOperations.set("stock", newStock + "");
                System.out.println("扣减库存成功, 剩余库存:" + newStock);
            } else {
                System.out.println("库存已经为0,不能继续扣减");
            }
            //------ 执行业务逻辑 ----end------
        } finally {
            if (clientId.equals(valueOperations.get(lockKey))) {
                //释放锁
                redisTemplate.delete(lockKey);
            }
        }

        return "success";
    }
}

改进5:

上述方法还有问题,那就是del和get并非原子操作

那么删除锁的正确姿势是使用lua脚本,通过redis的eval/evalsha命令来运行:

-- lua删除锁:
-- KEYS和ARGV分别是以集合方式传入的参数,对应上文的Test和uuid。
-- 如果对应的value等于传入的uuid。
if redis.call('get', KEYS[1]) == ARGV[1] 
    then 
	-- 执行删除操作
        return redis.call('del', KEYS[1]) 
    else 
	-- 不成功,返回0
        return 0 
end 

到此,基本上Redis的分布式锁的实现思想如下:

  • 获取锁的时候,使用setnx加锁,并使用expire命令为锁添加一个超时时间,超过该时间则自动释放锁,锁的value值为一个随机生成的UUID,通过此在释放锁的时候进行判断。
  • 获取锁的时候还设置一个获取的超时时间,若超过这个时间则放弃获取锁。
  • 释放锁的时候,通过UUID判断是不是该锁,若是该锁,则执行delete进行锁释放。

改进6:

上述方案解决了删除错锁的问题,但设置超时时间仍然是个棘手的问题

目前大公司的一个方案是这样子的:

  • 在加锁成功之后,启动一个守护线程
  • 守护线程每隔1/3的锁的超时时间就去延迟锁的超时时间,比如说锁设置为30秒,那就是每隔10秒就去延长锁的超时时间,重新设置为30秒
  • 业务代码执行完成,关闭守护线程

在实际操作中,需要注意几点:

  • 只续对的:和释放锁一样,需要判断锁的对象有没有发生变化,否则会造成无论谁加锁,守护线程都会重新设置锁的超时时间
  • 不能动不动就续:守护线程要在合理的时间再去设置锁的超时时间,否则会造成资源的浪费
  • 及时销毁:如果加锁的线程/进程已经处理完业务了,那么守护进程应该被销毁,否则会造成资源的浪费

方案三

上面的方案还得考虑Redis的部署问题。

众所周知,Redis有3种部署方式:

单机模式
Master-Slave + Sentinel(哨兵)选举模式
Redis Cluster(集群)模式

使用 Redis 做分布式锁的缺点在于:如果采用单机部署模式,会存在单点问题,只要 Redis 故障了。加锁就不行了。

采用 Master-Slave 模式/集群模式,

 线程1加了锁去执行业务了
刚好Redis的 master 发生故障挂掉了,此时还没有将数据同步到 slave 上
集群会选举一个新的 master 出来,但是新的 master 上并没有这个锁
线程2可以在新选举产生的 master 上去加锁,然后处理业务

这样的话,就导致了两个线程同时持有了锁,锁就不再具有安全性。

针对这个问题,有两个解决方案:

  • RedLock
  • Zookeeper【推荐】

RedLock

基于以上的考虑,Redis的作者提出了一个RedLock的算法。

这个算法的意思大概是这样的:假设 Redis 的部署模式是 Redis Cluster,总共有 5 个 Master 节点。

通过以下步骤获取一把锁:

  • 获取当前时间戳,单位是毫秒。
  • 轮流尝试在每个 Master 节点上创建锁,过期时间设置较短,一般就几十毫秒。
  • 尝试在大多数节点上建立一个锁,比如 5 个节点就要求是 3 个节点(n / 2 +1)。
  • 客户端计算建立好锁的时间,如果建立锁的时间小于超时时间,就算建立成功了。
  • 要是锁建立失败了,那么就依次删除这个锁。
  • 只要别人建立了一把分布式锁,你就得不断轮询去尝试获取锁。

image


Redission

目前业界对于Redis的分布式锁有了现成的实现方案了,比较出名的是Redisson开源框架。

Redisson 是 Redis 的 Java 实现的客户端,其 API 提供了比较全面的 Redis 命令的支持。

Redission 通过 Netty 支持非阻塞 I/O。

Redisson 封装了锁的实现,让我们像操作我们的本地 Lock一样来使用,除此之外还有对集合、对象、常用缓存框架等做了友好的封装,易于使用。

除此之外,Redisson还实现了分布式锁的自动续期机制、锁的互斥自等待机制、锁的可重入加锁于释放锁的机制,可以说Redisson对分布式锁的实现是实现了一整套机制的。

Redisson 可以便捷的支持多种Redis部署架构:

  • 单机模式
  • Master-Slave + Sentinel(哨兵)选举模式
  • Redis Cluster(集群)模式

引入Redission之后,使用上非常简单,RedissonClient客户端提供了众多的接口实现,支持可重入锁、公平锁、读写锁、锁超时、RedLock等都提供了完整实现。

使用如下:

引入maven

 
    
    org.redisson
    redisson-spring-boot-starter
    3.13.4

增加配置文件

@Configuration
public class RedissonConfig {
 
    @Bean
    public Redisson redisson() {
        Config config = new Config();
        //单机版
        //config.useSingleServer().setAddress("redis://192.168.1.1:8001").setDatabase(0);
 
        //集群版
        config.useClusterServers()
                .addNodeAddress("redis://192.168.1.1:8001")
                .addNodeAddress("redis://192.168.1.1:8002")
                .addNodeAddress("redis://192.168.1.2:8001")
                .addNodeAddress("redis://192.168.1.2:8002")
                .addNodeAddress("redis://192.168.1.3:8001")
                .addNodeAddress("redis://192.168.1.3:8002");
        return (Redisson) Redisson.create(config);
    }
} 

分布式锁实现

 @Service
public class RedisLockDemo {
    @Autowired
    private StringRedisTemplate redisTemplate;
 
    @Autowired
    private Redisson redisson;
 
    public String deduceStock() {
        String lockKey = "lockKey";
        RLock redissonLock = redisson.getLock(lockKey);
 
        try {
            //加锁(超时默认30s), 实现锁续命的功能(后台启动一个timer, 默认每10s检测一次是否持有锁)
            redissonLock.lock();
 
            //------ 执行业务逻辑 ----start------
            int stock = Integer.valueOf(redisTemplate.opsForValue().get("stock"));
            if (stock > 0) {
                int newStock = stock - 1;
                //执行业务操作减库存
                redisTemplate.opsForValue().set("stock", newStock + "");
                System.out.println("扣减库存成功, 剩余库存:" + newStock);
            } else {
                System.out.println("库存已经为0,不能继续扣减");
            }
            //------ 执行业务逻辑 ----end------
        } finally {
            //解锁
            redissonLock.unlock();
        }
        return "success";
    }
}

实现原理

image

image

image


分段锁

怎么在高并发的场景去实现一个高性能的分布式锁呢?

电商网站在大促的时候并发量很大:

(1)若抢购不是同一个商品,则可以增加Redis集群的cluster来实现,因为不是同一个商品,所以通过计算 key 的hash会落到不同的 cluster上;

(2)若抢购的是同一个商品,则计算key的hash值会落同一个cluster上,所以加机器也是没有用的。

针对第二个问题,可以使用库存分段锁的方式去实现。

分段锁

假如产品1有200个库存,可以将这200个库存分为10个段存储(每段20个),每段存储到一个cluster上;将key使用hash计算,使这些key最后落在不同的cluster上。

每个下单请求锁了一个库存分段,然后在业务逻辑里面,就对数据库或者是Redis中的那个分段库存进行操作即可,包括查库存 -> 判断库存是否充足 -> 扣减库存。

具体可以参照 ConcurrentHashMap 的源码去实现,它使用的就是分段锁。

高性能分布式锁具体可参考链接:每秒上千订单场景下的分布式锁高并发优化实践!【石杉的架构笔记

image

总结

  • 追求数据可靠性/强一致性:使用Zookeeper
  • 追求性能:选择Redis,推荐Redisson
  • Redis分布式锁目前最大问题在于:主从模式下/集群模式下,master节点宕机,异步同步数据导致锁丢失问题
  • Redis的RedLock算法具有很大争议性,一般不推荐使用

python代码实现(摘自jiandanokok)

import redis
import uuid
import time


class LockService:
    """
    基于Redis实现的分布式锁
    """
    host = 'localhost'
    port = 6379
    password = ''
    db = 1

    def __init__(self, conn=None):
        """
        如果不传连接池的话,默认读取配置的Redis作为连接池
        :param conn:
        """
        self.conn = conn if conn else self.get_redis_client()

    def get_redis_client(self):
        """
        获取Redis连接
        :return:
        """
        return redis.Redis(
            host=self.host,
            port=self.port,
            password=self.password,
            db=self.db
        )

    def acquire_lock(self, lock_name, acquire_timeout=10, expire_time=30):
        """
        加锁/获取锁

        如果不存在lock_name,则加锁,并且设置过期时间,避免死锁
        如果存在lock_name,则刷新过期时间

        :param lock_name:       锁的名称
        :param acquire_timeout: 加锁/获取锁的超时时间,默认10秒
        :param expire_time:     锁的超时时间,默认30秒
        :return:
        """
        lockname = f'lock:{lock_name}'
        value = str(uuid.uuid4())
        end_time = time.time() + acquire_timeout
        while time.time() < end_time:
            # 如果不存在这个锁则加锁并设置过期时间,避免死锁
            if self.conn.set(lockname, value, ex=expire_time, nx=True):
                return value
            time.sleep(0.1)
        return False

    def release_lock(self, lock_name, value):
        """
        释放锁

        :param lock_name: 锁的名称
        :param value:     锁的值
        :return:
        """
        unlock_script = """
        if redis.call("get",KEYS[1]) == ARGV[1] then
            return redis.call("del",KEYS[1])
        else
            return 0
        end
        """
        lockname = f'lock:{lock_name}'
        unlock = self.conn.register_script(unlock_script)
        result = unlock(keys=[lockname], args=[value])
        if result:
            return True
        else:
            return False
 


Redission分布式锁实现原理

可重入锁原理

通过value值记录重入次数,当重新获取锁时,value+1,释放锁的时候只有当value值为0才真正释放锁

image

可重试原理

redission获取锁时,会设置一个等待时间,在等待时间内,我们尝试获取锁,

第一次获取锁,如果能够拿到,那直接结束,如果拿不到,就要看尝试了获取锁了这么长时间,我们能够接收的等待时间还有没有了?如果还有,我们就尝试订阅这个锁的释放消息,然后一直等待到达到我们的忍耐限度,即我们最长能等的时间,如果一直没收到消息,那就获取锁失败,如果收到了,我们就再次尝试获取锁

看门狗机制

在获取锁成功之后,就会调用 scheduleExpirationRenewal(threadId) 方法开启自动续约,具体是由在 map 中添加业务名称和任务定时器,这个定时器会在一定时间内执行,比如说 10 秒就会自动开启任务,而该定时器中的任务就是不断的重置锁的最大超时时间,使用递归,不断的调用重置锁的时间,这就保证了锁是永久被当前线程持有。

这样就可以保证执行业务之后,才会释放锁。释放锁之后,会取消定时任务。

 

Redission MultiLock解决主从一致问题

先搞清楚什么是主从一致性问题,在集群的 Redis 中会区分出主力机和一般机器,在写 Redis 命令会放到主力机中运行,而主力机和一般机器需要保证数据都是一样的,也就是主从同步数据,在主力机中执行写命令时,突然发生宕机,未来得及将数据同步到其他一般机器中,而且当主力机宕机之后,会选出一台一般机器充当主力机,这时候的主力机没有同步之前的数据,那么其他线程再来写命名的时候就会出现问题了,这出现了主从不一致性。

在多主架构中,每台主机都可以接收写请求,这样即使某一台主机宕机,其他主机仍然可以继续处理写请求。

当某一台主机宕机后,如果在它恢复之前有新的写操作发生,可能会导致数据不一致。通过比较不同主机的数据状态,可以很容易地发现这些不一致的问题。

当宕机的主机恢复后,可以通过与其他主机的数据进行比较,找出差异并进行数据同步,确保所有主机的数据一致。

简单来说,设置多台主力机,每一次写命令都是一式多份,当某一台主力机出现宕机了,主从未来得及同步时,再写命令,同样一式多份,这样充当主力机出现了跟其他主力机不同的结果时,就很容易的发现问题了。

通过设置多台主力机并进行写操作的多份复制,可以有效提高系统的可靠性,并在出现问题时快速发现和解决数据不一致的问题。

image

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇