新聞中心
1、為什么要使用分布式鎖?
在分布式,微服務(wù)環(huán)境中,我們的服務(wù)被拆分為很多個(gè),并且每一個(gè)服務(wù)可能存在多個(gè)實(shí)例,部署在不同的服務(wù)器上。

創(chuàng)新互聯(lián)專注于岷縣網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗(yàn)。 熱誠(chéng)為您提供岷縣營(yíng)銷型網(wǎng)站建設(shè),岷縣網(wǎng)站制作、岷縣網(wǎng)頁(yè)設(shè)計(jì)、岷縣網(wǎng)站官網(wǎng)定制、成都小程序開(kāi)發(fā)服務(wù),打造岷縣網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供岷縣網(wǎng)站排名全網(wǎng)營(yíng)銷落地服務(wù)。
此時(shí)JVM中的synchronized和lock鎖,將只能對(duì)自己所在服務(wù)的JVM加鎖,而跨機(jī)器,跨JMV的場(chǎng)景,仍然需要鎖的場(chǎng)景就需要使用到分布式鎖了。
2、為什么要使用Redis實(shí)現(xiàn)分布式鎖?
因?yàn)镽edis的性能很好,并且Redis是單線程的,天生線程安全。
并且Redis的key過(guò)期效果與Zookeeper的臨時(shí)節(jié)點(diǎn)的效果相似,都能實(shí)現(xiàn)鎖超時(shí)自動(dòng)釋放的功能。
而且Redis還可以使用lua腳本來(lái)保證redis多條命令實(shí)現(xiàn)整體的原子性,Redisson就是使用lua腳本的原子性來(lái)實(shí)現(xiàn)分布式鎖的。
3、我們?nèi)绾位赗edisson封裝分布式鎖?
1)、基于RedissonClient實(shí)現(xiàn)手動(dòng)加鎖
2)、基于AOP+Redisson封裝注解版的分布式鎖
3)、將分布式鎖功能封裝成一個(gè)starter, 引入jar包即可實(shí)現(xiàn)分布式鎖
4、代碼實(shí)現(xiàn)
4.1、整合封裝Redisson
我們前面封裝了基于Redis擴(kuò)展了SpringCache,封裝了
redis-cache-spring-boot-starter。
我們的分布式鎖基于這個(gè)模塊實(shí)現(xiàn),下面引入依賴。
引入依賴
itdl-parent
com.itdl
1.0
4.0.0
redis-lock-spring-boot-starter
Redis實(shí)現(xiàn)分布式鎖的自定義starter封裝模塊
${java.version}
${java.version}
com.itdl
redis-cache-spring-boot-starter
org.redisson
redisson
org.springframework.boot
spring-boot-starter-aop
編寫(xiě)RedisLockConfig配置RedissonClient
/**
* Redis實(shí)現(xiàn)分布式鎖的配置(使用Redisson)
*/
@Configuration // 標(biāo)識(shí)為一個(gè)配置項(xiàng),注入Spring容器
@AutoConfigureBefore({CustomRedisConfig.class, CacheNullValuesHandle.class})
@ConditionalOnProperty(value = "redis.enable", havingValue = "true") // 開(kāi)啟redis.enable=true時(shí)生效
@Slf4j
public class RedisLockConfig {
private volatile boolean isCluster = false;
private volatile String redisHostsStr = "";
@Bean
@ConditionalOnMissingBean
public RedissonClient redissonClient(CustomRedisProperties redisProperties){
// 構(gòu)建配置
Config config = buildConfig(redisProperties);
RedissonClient redissonClient = Redisson.create(config);
log.info("==============創(chuàng)建redisClient{}版成功:{}==================", isCluster ? "集群": "單機(jī)", redisHostsStr);
return redissonClient;
}
private Config buildConfig(CustomRedisProperties redisProperties) {
final Config config = new Config();
// 根據(jù)逗號(hào)切割host列表
Set hosts = org.springframework.util.StringUtils.commaDelimitedListToSet(redisProperties.getHost());
if (CollectionUtils.isEmpty(hosts)){
throw new RuntimeException("redis host address cannot be empty");
}
// 只有一個(gè)host, 表示是單機(jī)host
if (hosts.size() == 1){
String hostPort = hosts.stream().findFirst().get();
redisHostsStr = "redis://" + hostPort.trim();
config.useSingleServer()
.setAddress(redisHostsStr)
.setDatabase(redisProperties.getDatabase())
.setPassword(StringUtils.isBlank(redisProperties.getPassword()) ? null : redisProperties.getPassword())
;
isCluster = false;
}else {
// 集群處理
String[] redisHosts = new String[hosts.size()];
int i = 0;
for (String host : hosts) {
String[] split = host.split(":");
if (split.length != 2){
throw new RuntimeException("host or port err");
}
redisHosts[i] = "redis://" + host.trim();
i++;
}
redisHostsStr = String.join(",", redisHosts);
// 配置集群
config.useClusterServers()
.addNodeAddress(redisHosts)
.setPassword(StringUtils.isBlank(redisProperties.getPassword()) ? null : redisProperties.getPassword())
// 解決Not all slots covered! Only 10922 slots are available
.setCheckSlotsCoverage(false);
isCluster = true;
}
return config;
}
} 我們配置時(shí)需要優(yōu)先配置好redis-cache-spring-boot-starter,使用@AutoConfigureBefore({CustomRedisConfig.class, CacheNullValuesHandle.class})
直接使用,不再重復(fù)造輪子,然后我們根據(jù)自定義屬性配置文件CustomRedisProperties來(lái)創(chuàng)建RedissonClient的Bean。
編寫(xiě)META-INF/spring.factories進(jìn)行自動(dòng)配置
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.itdl.lock.config.RedisLockConfig在測(cè)試模塊緩存service添加分布式鎖
@Cacheable(cacheNames = "demo2#3", key = "#id")
public TestEntity getById2(Long id){
// 創(chuàng)建分布式鎖
RLock lock = redissonClient.getLock("demo2_lock");
// 加鎖
lock.lock(10, TimeUnit.SECONDS);
if (id > 1000){
log.info("id={}沒(méi)有查詢到數(shù)據(jù),返回空值", id);
return null;
}
TestEntity testEntity = new TestEntity(new Random().nextLong(), UUID.randomUUID().toString(), new Random().nextInt(20) + 10);
log.info("模擬查詢數(shù)據(jù)庫(kù):{}", testEntity);
// 釋放鎖
lock.unlock();
return testEntity;
}我們這里的@Cacheable沒(méi)有加sync=true, 此時(shí)并發(fā)請(qǐng)求會(huì)存在線程安全問(wèn)題,但是我們?cè)诜椒w局部添加了分布式鎖,因此我們的程序會(huì)按照順序執(zhí)行。
如果我們的參數(shù)被定死了,最終請(qǐng)求會(huì)被先存儲(chǔ)到緩存,所以后續(xù)的查詢就會(huì)走緩存,這能很好的測(cè)試分布式鎖的效果。
編寫(xiě)測(cè)試程序
@SpringBootTest
public class TestRedisLockRunner6 {
@Autowired
private MyTestService myTestService;
// 創(chuàng)建一個(gè)固定線程池
private ExecutorService executorService = Executors.newFixedThreadPool(16);
/**
* 多線程訪問(wèn)請(qǐng)求,測(cè)試切面的線程安全性
*/
@Test
public void testMultiMyTestService() throws InterruptedException {
for (int i = 0; i < 100; i++) {
executorService.submit(() -> {
// 每次查詢同一個(gè)參數(shù)
TestEntity t1 = myTestService.getById2(1L);
});
}
// 主線程休息10秒種
Thread.sleep(10000);
}
}我們可以看到,結(jié)果并沒(méi)有符合我們預(yù)期,但是又部分符合我們預(yù)期,為什么呢?
因?yàn)槲覀兊腀Cacheable是存在線程安全問(wèn)題的,因?yàn)樗炔樵兙彺孢@個(gè)操作存在并發(fā)問(wèn)題,查詢時(shí)就同時(shí)有N個(gè)請(qǐng)求進(jìn)入@Cacheable, 并且都查詢沒(méi)有緩存。
然后同時(shí)執(zhí)行方法體,但方法體加了分布式鎖,所以排隊(duì)進(jìn)行處理,因此序號(hào)有序。
但打印數(shù)量不足總數(shù),是因?yàn)檫@一批次沒(méi)有全部到達(dá)@Cacheable,而是執(zhí)行完畢之后才將緩存回填,所以后續(xù)的請(qǐng)求就是走緩存了。
解決方案:我們加上sync=true之后就能實(shí)現(xiàn),只查詢一次數(shù)據(jù)庫(kù),就可以回填緩存了。如果我們?nèi)サ鬇Cacheable注解,則會(huì)每一次都查詢數(shù)據(jù)庫(kù),但是時(shí)按照順序執(zhí)行的。
加上sync=true測(cè)試
效果達(dá)到了我們的預(yù)期,繼續(xù)看一下去掉@Cacheable注解的情況。
去掉@Cacheable注解測(cè)試
我們的分布式鎖功能是沒(méi)有問(wèn)題的,但是每次我們都需要執(zhí)行g(shù)etLock(), lock.lock(), lock.unlock(),是不是很麻煩,能不能一個(gè)注解搞定?
當(dāng)然是可以的。
4.2、封裝注解版分布式鎖
編寫(xiě)@RedisLock注解
/**
* 自定義Redis分布式鎖
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisLock {
/**分布式鎖的名稱,支持el表達(dá)式*/
String lockName() default "";
/**鎖類型 默認(rèn)為可重入鎖*/
LockType lockType() default REENTRANT_LOCK;
/**獲取鎖等待時(shí)間,默認(rèn)60秒*/
long waitTime() default 30000L;
/** 鎖自動(dòng)釋放時(shí)間,默認(rèn)60秒*/
long leaseTime() default 60000L;
/**
* 被加鎖方法執(zhí)行完是否立即釋放鎖
*/
boolean immediatelyUnLock() default true;
/** 時(shí)間單位, 默認(rèn)毫秒*/
TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
}編寫(xiě)分布式鎖切面RedisLockAop
/**
* Redis分布式鎖的切面邏輯實(shí)現(xiàn)
*/
@ConditionalOnProperty(value = "redis.enable", havingValue = "true") // 開(kāi)啟redis.enable=true時(shí)生效
@AutoConfigureBefore(RedisLockConfig.class)
@Aspect
@Configuration
@Slf4j
public class RedisLockAop {
@Resource
private RedissonClient redissonClient;
/**
* 切點(diǎn)
*/
@Pointcut("@annotation(com.itdl.lock.anno.RedisLock)")
public void pointcut(){
}
/**
* 環(huán)繞通知 注解針對(duì)的是方法,這里切點(diǎn)也獲取方法進(jìn)行處理就可以了
*/
@Around("pointcut()")
public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
// 獲取方法
Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
// 獲取方法上的分布式鎖注解
RedisLock redisLock = method.getDeclaredAnnotation(RedisLock.class);
// 獲取注解的參數(shù)
// 鎖名稱
String lockName = redisLock.lockName();
// 鎖類型
LockType lockType = redisLock.lockType();
// 獲取RedissonClient的Lock
RLock lock = getRLock(lockName, lockType, redisLock);
//獲取到鎖后, 開(kāi)始執(zhí)行方法,執(zhí)行完畢后釋放鎖
try {
log.debug("=========>>>獲取鎖成功, 即將執(zhí)行業(yè)務(wù)邏輯:{}", lockName);
Object proceed = joinPoint.proceed();
// 釋放鎖
if (redisLock.immediatelyUnLock()) {
//是否立即釋放鎖
lock.unlock();
}
log.debug("=========>>>獲取鎖成功且執(zhí)行業(yè)務(wù)邏輯成功:{}", lockName);
return proceed;
} catch (Exception e) {
log.error("=========>>>獲取鎖成功但執(zhí)行業(yè)務(wù)邏輯失?。簕}", lockName);
e.printStackTrace();
throw new RedisLockException(LockErrCode.EXEC_BUSINESS_ERR);
}finally {
// 查詢當(dāng)前線程是否保持此鎖定 被鎖定則解鎖
lock.unlock();
log.debug("=========>>>釋放鎖成功:{}", lockName);
}
}
/**
* 根據(jù)鎖名稱和類型創(chuàng)建鎖
* @param lockName 鎖名稱
* @param lockType 鎖類型
* @return 鎖
*/
private RLock getRLock(String lockName, LockType lockType, RedisLock redisLock) throws InterruptedException {
RLock lock;
switch (lockType){
case FAIR_LOCK:
lock = redissonClient.getFairLock(lockName);
break;
case READ_LOCK:
lock = redissonClient.getReadWriteLock(lockName).readLock();
break;
case WRITE_LOCK:
lock = redissonClient.getReadWriteLock(lockName).writeLock();
break;
default:
// 默認(rèn)加可重入鎖,也就是普通的分布式鎖
lock = redissonClient.getLock(lockName);
break;
}
// 首先嘗試獲取鎖,如果在規(guī)定時(shí)間內(nèi)沒(méi)有獲取到鎖,則調(diào)用lock等待鎖,直到獲取鎖為止
if (lock.tryLock()) {
lock.tryLock(redisLock.waitTime(), redisLock.leaseTime(), redisLock.timeUnit());
}else {
// 如果leaseTime>0,規(guī)定時(shí)間內(nèi)獲取鎖,超時(shí)則自動(dòng)釋放鎖
long leaseTime = redisLock.leaseTime();
if (leaseTime > 0) {
lock.lock(redisLock.leaseTime(), redisLock.timeUnit());
} else {
// 自動(dòng)釋放鎖時(shí)間設(shè)置為0或者負(fù)數(shù),則加鎖不設(shè)置超時(shí)時(shí)間
lock.lock();
}
}
return lock;
}
}話不多說(shuō),封裝的邏輯已經(jīng)在注釋中寫(xiě)的很清晰了。
將切面也放入自動(dòng)配置spring.factories中
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.itdl.lock.config.RedisLockConfig,\
com.itdl.lock.anno.RedisLockAop測(cè)試注解版分布式鎖
@RedisLock(lockName = "demo4_lock")
public TestEntity getById4(Long id) throws InterruptedException {
index++;
log.info("current index is : {}", index);
Thread.sleep(new Random().nextInt(10) * 100);
TestEntity testEntity = new TestEntity(new Random().nextLong(), UUID.randomUUID().toString(), new Random().nextInt(20) + 10);
log.info("模擬查詢數(shù)據(jù)庫(kù):{}", testEntity);
return testEntity;
}可以看到,我們就是一個(gè)注解分布式鎖的效果,而分布式鎖與緩存注解通常不會(huì)一起使用,因?yàn)橐话銜?huì)在存在事務(wù)問(wèn)題的地方我們會(huì)使用鎖,在多個(gè)JMV操作同一條數(shù)據(jù)做寫(xiě)操作時(shí)需要加分布式鎖。
編寫(xiě)測(cè)試程序
@SpringBootTest
public class TestRedisLockRunner6 {
@Autowired
private MyTestService myTestService;
// 創(chuàng)建一個(gè)固定線程池
private ExecutorService executorService = Executors.newFixedThreadPool(16);
/**
* 多線程訪問(wèn)請(qǐng)求,測(cè)試切面的線程安全性
*/
@Test
public void testMultiMyTestService() throws InterruptedException {
for (int i = 0; i < 100; i++) {
executorService.submit(() -> {
try {
TestEntity t1 = myTestService.getById4(1L);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
// 主線程休息10秒種
Thread.sleep(60000);
}
}測(cè)試結(jié)果
5、小結(jié)
我們將分布式鎖基于緩存擴(kuò)展了一版,也就是說(shuō)本starter即有分布式緩存功能,又有分布式鎖功能。
而注解版的分布式鎖能夠解決大多數(shù)場(chǎng)景的并核問(wèn)題,小粒度的Lock鎖方式補(bǔ)全其他場(chǎng)景。
將兩者封裝成為一個(gè)starter,我們就可以很方便的使用分布式鎖功能,引入相關(guān)包即可,開(kāi)箱即用。
新聞名稱:Springboot+Redisson封裝分布式鎖Starter
標(biāo)題來(lái)源:http://www.5511xx.com/article/dpjpsso.html


咨詢
建站咨詢
