最强分布式工具Redisson
什么是Redisson?
一个基于Redis实现的分布式工具,有基本分布式对象和高级又抽象的分布式服务,为每个试图再造分布式轮子的程序员带来了大部分分布式问题的解决办法。
Redisson和Jedis、Lettuce有什么区别?
Redisson和它俩的区别就像一个用鼠标操作图形化界面,一个用命令行操作文件。Redisson是更高层的抽象,Jedis和Lettuce是Redis命令的封装。
Jedis是Redis官方推出的用于通过Java连接Redis客户端的一个工具包,提供了Redis的各种命令支持
Lettuce是一种可扩展的线程安全的 Redis 客户端,通讯框架基于Netty,支持高级的 Redis 特性,比如哨兵,集群,管道,自动重新连接和Redis数据模型。 Spring Boot 2.x 开始 Lettuce 已取代 Jedis 成为首选 Redis 的客户端。
Redisson是架设在Redis基础上,通讯基于Netty的综合的、新型的中间件,企业级开发中使用Redis的最佳范本
Jedis把Redis命令封装好,Lettuce则进一步有了更丰富的Api,也支持集群等模式。但是两者也都点到为止,只给了你操作Redis数据库的脚手架,而Redisson则是基于Redis、Lua和Netty建立起了成熟的分布式解决方案,甚至redis官方都推荐的一种工具集。
分布式锁
分布式锁怎么实现
分布式锁是并发业务下的刚需,虽然实现五花八门:ZooKeeper有Znode顺序节点,数据库有表级锁和乐/悲观锁,Redis有setNx,但是殊途同归,最终还是要回到互斥上来,本篇介绍Redisson,那就以redis为例。
怎么写一个简单的Redis分布式锁?
以Spring Data Redis为例,用RedisTemplate来操作Redis(setIfAbsent已经是setNx + expire的合并命令),如下
| public Boolean tryLock(String key, String value, long timeout, TimeUnit unit) { return redisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit); }
public void unlock(String lockName, String uuid) { if(uuid.equals(redisTemplate.opsForValue().get(lockName)){ redisTemplate.opsForValue().del(lockName); } }
if(tryLock){ }finally{ unlock; } Copy
|
简单1.0版本完成,聪明的小张一眼看出,这是锁没错,但get和del操作非原子性,并发一旦大了,无法保证进程安全。于是小张提议,用Lua脚本
Lua脚本是什么?
Lua脚本是redis已经内置的一种轻量小巧语言,其执行是通过redis的eval/evalsha命令来运行,把操作封装成一个Lua脚本,如论如何都是一次执行的原子操作。
于是2.0版本通过Lua脚本删除
lockDel.lua如下
1 2 3 4 5 6 7 8 9
| if redis.call('get', KEYS[1]) == ARGV[1] then -- 执行删除操作 return redis.call('del', KEYS[1]) else -- 不成功,返回0 return 0 end Copy
|
delete操作时执行Lua命令
1 2 3 4 5 6 7
| DefaultRedisScript<Object> unlockScript = new DefaultRedisScript(); unlockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lockDel.lua")));
redisTemplate.execute(unlockScript, Collections.singletonList(keyName), value); Copy
|
2.0似乎更像一把锁,但好像又缺少了什么,小张一拍脑袋,synchronized和ReentrantLock都很丝滑,因为他们都是可重入锁,一个线程多次拿锁也不会死锁,我们需要可重入。
怎么保证可重入?
重入就是,同一个线程多次获取同一把锁是允许的,不会造成死锁,这一点synchronized偏向锁提供了很好的思路,synchronized的实现重入是在JVM层面,JAVA对象头MARK WORD中便藏有线程ID和计数器来对当前线程做重入判断,避免每次CAS。
当一个线程访问同步块并获取锁时,会在对象头和栈帧中的锁记录里存储偏向的线程ID,以后该线程在进入和退出同步块时不需要进行CAS操作来加锁和解锁,只需简单测试一下对象头的Mark Word里是否存储着指向当前线程的偏向锁。如果测试成功,表示线程已经获得了锁。如果测试失败,则需要再测试一下Mark Word中偏向锁标志是否设置成1:没有则CAS竞争;设置了,则CAS将对象头偏向锁指向当前线程。
再维护一个计数器,同个线程进入则自增1,离开再减1,直到为0才能释放
可重入锁
仿造该方案,我们需改造Lua脚本:
1.需要存储 锁名称lockName、获得该锁的线程id和对应线程的进入次数count
2.加锁
每次线程获取锁时,判断是否已存在该锁
3.解锁
每次线程来解锁时,判断是否已存在该锁
- 存在
- 是否有该线程的id的hash key,有则减1,无则返回解锁失败
- 减1后,判断剩余count是否为0,为0则说明不再需要这把锁,执行del命令删除
解锁的判断,当一把锁不再被需要了,每次解锁一次,count减1,直到为0时,执行删除
加锁 lock.lua
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| local key = KEYS[1]; local threadId = ARGV[1]; local releaseTime = ARGV[2];
-- lockname不存在 if(redis.call('exists', key) == 0) then redis.call('hset', key, threadId, '1'); redis.call('expire', key, releaseTime); return 1; end;
-- 当前线程已id存在 if(redis.call('hexists', key, threadId) == 1) then redis.call('hincrby', key, threadId, '1'); redis.call('expire', key, releaseTime); return 1; end; return 0; Copy
|
解锁 unlock.lua
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| local key = KEYS[1]; local threadId = ARGV[1];
-- lockname、threadId不存在 if (redis.call('hexists', key, threadId) == 0) then return nil; end;
-- 计数器-1 local count = redis.call('hincrby', key, threadId, -1);
-- 删除lock if (count == 0) then redis.call('del', key); return nil; end; Copy
|
代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
|
@Getter @Setter public class RedisLock {
private RedisTemplate redisTemplate; private DefaultRedisScript<Long> lockScript; private DefaultRedisScript<Object> unlockScript;
public RedisLock(RedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; lockScript = new DefaultRedisScript<>(); this.lockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lock.lua"))); this.lockScript.setResultType(Long.class); unlockScript = new DefaultRedisScript<>(); this.unlockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("unlock.lua"))); }
public String tryLock(String lockName, long releaseTime) { String key = UUID.randomUUID().toString();
Long result = (Long) redisTemplate.execute( lockScript, Collections.singletonList(lockName), key + Thread.currentThread().getId(), releaseTime);
if (result != null && result.intValue() == 1) { return key; } else { return null; } }
public void unlock(String lockName, String key) { redisTemplate.execute(unlockScript, Collections.singletonList(lockName), key + Thread.currentThread().getId() ); } } Copy
|
严谨的小张觉得虽然当个普通互斥锁,已经稳稳够用,可是业务里总是又很多特殊情况的,比如A进程在获取到锁的时候,因业务操作时间太长,锁释放了但是业务还在执行,而此刻B进程又可以正常拿到锁做业务操作,两个进程操作就会存在依旧有共享资源的问题。
而且如果负责储存这个分布式锁的Redis节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。
所以我们希望在这种情况时,可以延长锁的releaseTime延迟释放锁来直到完成业务期望结果,这种不断延长锁过期时间来保证业务执行完成的操作就是锁续约。
Redisson分布式锁
依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.13.6</version> </dependency>
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.13.6</version> </dependency> Copy
|
配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Configuration public class RedissionConfig { @Value("${spring.redis.host}") private String redisHost;
@Value("${spring.redis.password}") private String password;
private int port = 6379;
@Bean public RedissonClient getRedisson() { Config config = new Config(); config.useSingleServer(). setAddress("redis://" + redisHost + ":" + port). setPassword(password); config.setCodec(new JsonJacksonCodec()); return Redisson.create(config); } } Copy
|
启用分布式锁
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Resource private RedissonClient redissonClient;
RLock rLock = redissonClient.getLock(lockName); try { boolean isLocked = rLock.tryLock(expireTime, TimeUnit.MILLISECONDS); if (isLocked) { } } catch (Exception e) { rLock.unlock(); } Copy
|
RLock
RLock是Redisson分布式锁的最核心接口,继承了concurrent包的Lock接口和自己的RLockAsync接口,RLockAsync的返回值都是RFuture,是Redisson执行异步实现的核心逻辑,也是Netty发挥的主要阵地。
RLock如何加锁?
从RLock进入,找到RedissonLock类,找到tryLock方法再递进到干事的tryAcquireOnceAsync方法,这是加锁的主要代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { if (leaseTime != -1L) { return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN); } else { RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e == null) { if (ttlRemaining) { this.scheduleExpirationRenewal(threadId); }
} }); return ttlRemainingFuture; } } Copy
|
此处出现leaseTime时间判断的2个分支,实际上就是加锁时是否设置过期时间,未设置过期时间(-1)时则会有watchDog的锁续约(下文),一个注册了加锁事件的续约任务。我们先来看有过期时间tryLockInnerAsync部分,
evalWriteAsync是eval命令执行lua的入口
1 2 3 4 5
| <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { this.internalLockLeaseTime = unit.toMillis(leaseTime); return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)}); } Copy
|
这里揭开真面目,eval命令执行Lua脚本的地方,此处的Lua脚本展开
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| -- 不存在该key时 if (redis.call('exists', KEYS[1]) == 0) then -- 新增该锁并且hash中该线程id对应的count置1 redis.call('hincrby', KEYS[1], ARGV[2], 1); -- 设置过期时间 redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end;
-- 存在该key 并且 hash中线程id的key也存在 if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then -- 线程重入次数++ redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]); Copy
|
和前面我们写自定义的分布式锁的脚本几乎一致,看来redisson也是一样的实现,具体参数分析:
1 2 3 4 5 6 7
| KEYS[1] = Collections.singletonList(this.getName())
ARGV[1] = this.internalLockLeaseTime
ARGV[2] = this.getLockName(threadId) Copy
|
总共3个参数完成了一段逻辑:
判断该锁是否已经有对应hash表存在,
• 没有对应的hash表:则set该hash表中一个entry的key为锁名称,value为1,之后设置该hash表失效时间为leaseTime
• 存在对应的hash表:则将该lockName的value执行+1操作,也就是计算进入次数,再设置失效时间leaseTime
• 最后返回这把锁的ttl剩余时间
既然如此,那解锁的步骤也肯定有对应的-1操作,再看unlock方法,同样查找方法名,一路到
1 2 3 4
| protected RFuture<Boolean> unlockInnerAsync(long threadId) { return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end;if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.getName(), this.getChannelName()), new Object[]{LockPubSub.unlockMessage, this.internalLockLeaseTime, this.getLockName(threadId)}); } Copy
|
掏出Lua部分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| -- 不存在key if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil; end; -- 计数器 -1 local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then -- 过期时间重设 redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else -- 删除并发布解锁消息 redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil; Copy
|
该Lua KEYS有2个Arrays.asList(getName(), getChannelName())
name 锁名称 channelName,用于pubSub发布消息的channel名称
ARGV变量有三个LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)
LockPubSub.UNLOCK_MESSAGE,channel发送消息的类别,此处解锁为0 internalLockLeaseTime,watchDog配置的超时时间,默认为30s lockName 这里的lockName指的是uuid和threadId组合的唯一值
步骤如下:
1.如果该锁不存在则返回nil;
2.如果该锁存在则将其线程的hash key计数器-1,
3.计数器counter>0,重置下失效时间,返回0;否则,删除该锁,发布解锁消息unlockMessage,返回1;
其中unLock的时候使用到了Redis发布订阅PubSub完成消息通知。
而订阅的步骤就在RedissonLock的加锁入口的lock方法里
1 2 3 4 5 6 7 8 9 10 11 12
| long threadId = Thread.currentThread().getId(); Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId); if (ttl != null) { RFuture<RedissonLockEntry> future = this.subscribe(threadId); if (interruptibly) { this.commandExecutor.syncSubscriptionInterrupted(future); } else { this.commandExecutor.syncSubscription(future); } Copy
|
当锁被其他线程占用时,通过监听锁的释放通知(在其他线程通过RedissonLock释放锁时,会通过发布订阅pub/sub功能发起通知),等待锁被其他线程释放,也是为了避免自旋的一种常用效率手段。
解锁消息
为了一探究竟通知了什么,通知后又做了什么,进入LockPubSub。
这里只有一个明显的监听方法onMessage,其订阅和信号量的释放都在父类PublishSubscribe,我们只关注监听事件的实际操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| protected void onMessage(RedissonLockEntry value, Long message) { Runnable runnableToExecute; if (message.equals(unlockMessage)) { runnableToExecute = (Runnable)value.getListeners().poll(); if (runnableToExecute != null) { runnableToExecute.run(); } value.getLatch().release(); } else if (message.equals(readUnlockMessage)) { while(true) { runnableToExecute = (Runnable)value.getListeners().poll(); if (runnableToExecute == null) { value.getLatch().release(value.getLatch().getQueueLength()); break; } runnableToExecute.run(); } } } Copy
|
发现一个是默认解锁消息,一个是*读锁解锁消息***,**因为redisson是有提供读写锁的,而读写锁读读情况和读写、写写情况互斥情况不同,我们只看上面的默认解锁消息unlockMessage分支
LockPubSub监听最终执行了2件事
- runnableToExecute.run() 执行监听回调
- value.getLatch().release(); 释放信号量
Redisson通过LockPubSub监听解锁消息,执行监听回调和释放信号量通知等待线程可以重新抢锁。
这时再回来看tryAcquireOnceAsync另一分支
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { if (leaseTime != -1L) { return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN); } else { RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e == null) { if (ttlRemaining) { this.scheduleExpirationRenewal(threadId); }
} }); return ttlRemainingFuture; } } Copy
|
可以看到,无超时时间时,在执行加锁操作后,还执行了一段费解的逻辑
1 2 3 4 5 6 7 8 9
| ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e == null) { if (ttlRemaining) { this.scheduleExpirationRenewal(threadId); }
} }) Copy
|
此处涉及到Netty的Future/Promise-Listener模型(参考Netty中的异步编程),Redisson中几乎全部以这种方式通信(所以说Redisson是基于Netty通信机制实现的),理解这段逻辑可以试着先理解
在 Java 的 Future 中,业务逻辑为一个 Callable 或 Runnable 实现类,该类的 call()或 run()执行完毕意味着业务逻辑的完结,在 Promise 机制中,可以在业务逻辑中人工设置业务逻辑的成功与失败,这样更加方便的监控自己的业务逻辑。
这块代码的表面意义就是,在执行异步加锁的操作后,加锁成功则根据加锁完成返回的ttl是否过期来确认是否执行一段定时任务。
这段定时任务的就是watchDog的核心。
锁续约
查看RedissonLock.this.scheduleExpirationRenewal(threadId)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| private void scheduleExpirationRenewal(long threadId) { RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry(); RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry); if (oldEntry != null) { oldEntry.addThreadId(threadId); } else { entry.addThreadId(threadId); this.renewExpiration(); }
}
private void renewExpiration() { RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName()); if (ee != null) { Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() { public void run(Timeout timeout) throws Exception { RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName()); if (ent != null) { Long threadId = ent.getFirstThreadId(); if (threadId != null) { RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e); } else { if (res) { RedissonLock.this.renewExpiration(); }
} }); } } } }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS); ee.setTimeout(task); } } Copy
|
拆分来看,这段连续嵌套且冗长的代码实际上做了几步
• 添加一个netty的Timeout回调任务,每(internalLockLeaseTime / 3)毫秒执行一次,执行的方法是renewExpirationAsync
• renewExpirationAsync重置了锁超时时间,又注册一个监听器,监听回调又执行了renewExpiration
renewExpirationAsync 的Lua如下
1 2 3 4 5 6 7 8 9 10
| protected RFuture<Boolean> renewExpirationAsync(long threadId) { return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)}); }
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0; Copy
|
重新设置了超时时间。
Redisson加这段逻辑的目的是什么?
目的是为了某种场景下保证业务不影响,如任务执行超时但未结束,锁已经释放的问题。
当一个线程持有了一把锁,由于并未设置超时时间leaseTime,Redisson默认配置了30S,开启watchDog,每10S对该锁进行一次续约,维持30S的超时时间,直到任务完成再删除锁。
这就是Redisson的锁续约,也就是WatchDog实现的基本思路。
流程概括
通过整体的介绍,流程简单概括:
- A、B线程争抢一把锁,A获取到后,B阻塞
- B线程阻塞时并非主动CAS,而是PubSub方式订阅该锁的广播消息
- A操作完成释放了锁,B线程收到订阅消息通知
- B被唤醒开始继续抢锁,拿到锁
详细加锁解锁流程总结如下图:
总结
Redisson整体实现分布式加解锁流程的实现稍显复杂,作者Rui Gu对Netty和JUC、Redis研究深入,利用了很多高级特性和语义,值得深入学习,本次介绍也只是单机Redis下锁实现,Redisson也提供了多机情况下的联锁(MultiLock)和官方推荐的红锁(RedLock),下一章再详细介绍。
所以,当你真的需要分布式锁时,不妨先来Redisson里找找。