日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
加鎖了還有并發(fā)問題?Redis分布式鎖,真的用對了?

加鎖了還有并發(fā)問題?Redis分布式鎖,真的用對了?

作者:丑胖俠二師兄 2021-10-25 09:50:57

新聞

前端

分布式

Redis 新接手的項目,偶爾會出現(xiàn)賬不平的問題。之前的技術(shù)老大臨走時給的解釋是:排查了,沒找到原因,之后太忙就沒再解決,可能是框架的原因……

成都創(chuàng)新互聯(lián)公司堅持“要么做到,要么別承諾”的工作理念,服務(wù)領(lǐng)域包括:做網(wǎng)站、網(wǎng)站制作、企業(yè)官網(wǎng)、英文網(wǎng)站、手機端網(wǎng)站、網(wǎng)站推廣等服務(wù),滿足客戶于互聯(lián)網(wǎng)時代的遂平網(wǎng)站設(shè)計、移動媒體設(shè)計的需求,幫助企業(yè)找到有效的互聯(lián)網(wǎng)解決方案。努力成為您成熟可靠的網(wǎng)絡(luò)建設(shè)合作伙伴!

新接手的項目,偶爾會出現(xiàn)賬不平的問題。之前的技術(shù)老大臨走時給的解釋是:排查了,沒找到原因,之后太忙就沒再解決,可能是框架的原因……

既然項目交付到手中,這樣的問題是必須要解決的。梳理了所有賬務(wù)處理邏輯,最終找到了原因:數(shù)據(jù)庫并發(fā)操作熱點賬戶導(dǎo)致。就這這個問題,來聊一聊分布式系統(tǒng)下基于Redis的分布式鎖。順便也分解一下問題形成原因及解決方案。

原因分析

系統(tǒng)并發(fā)量并不高,存在熱點賬戶,但也不至于那么嚴(yán)重。問題的根源在于系統(tǒng)架構(gòu)設(shè)計,人為的制造了并發(fā)。場景是這樣的:商戶批量導(dǎo)入一批數(shù)據(jù),系統(tǒng)會進(jìn)行前置處理,并對賬戶余額進(jìn)行增減。

此時,另外一個定時任務(wù),也會對賬戶進(jìn)行掃描更新。而且對同一賬戶的操作分布到各個系統(tǒng)當(dāng)中,熱點賬戶也就出現(xiàn)了。

針對此問題的解決方案,從架構(gòu)層面可以考慮將賬務(wù)系統(tǒng)進(jìn)行抽離,集中在一個系統(tǒng)中進(jìn)行處理,所有的數(shù)據(jù)庫事務(wù)及執(zhí)行順序由賬務(wù)系統(tǒng)來統(tǒng)籌處理。從技術(shù)方面來講,則可以通過鎖機制來對熱點賬戶進(jìn)行加鎖。

本篇文章就針對熱點賬戶基于分布式鎖的實現(xiàn)方式進(jìn)行詳細(xì)的講解。

鎖的分析

在Java的多線程環(huán)境下,通常有幾類鎖可以使用:

  • JVM內(nèi)存模型級別的鎖,常用的有:synchronized、Lock等;
  • 數(shù)據(jù)庫鎖,比如樂觀鎖,悲觀鎖等;
  • 分布式鎖;

JVM內(nèi)存級別的鎖,可以保證單體服務(wù)下線程的安全性,比如多個線程訪問/修改一個全局變量。但當(dāng)系統(tǒng)進(jìn)行集群部署時,JVM級別的本地鎖就無能為力了。

悲觀鎖與樂觀鎖

像上述案例中,熱點賬戶就屬于分布式系統(tǒng)中的共享資源,我們通常會采用數(shù)據(jù)庫鎖分布式鎖來進(jìn)行解決。

數(shù)據(jù)庫鎖,又分為樂觀鎖悲觀鎖。

悲觀鎖是基于數(shù)據(jù)庫(Mysql的InnoDB)提供的排他鎖來實現(xiàn)的。在進(jìn)行事務(wù)操作時,通過select ... for update語句,MySQL會對查詢結(jié)果集中每行數(shù)據(jù)都添加排他鎖,其他線程對該記錄的更新與刪除操作都會阻塞。從而達(dá)到共享資源的順序執(zhí)行(修改);

樂觀鎖是相對悲觀鎖而言的,樂觀鎖假設(shè)數(shù)據(jù)一般情況不會造成沖突,所以在數(shù)據(jù)進(jìn)行提交更新的時候,才會正式對數(shù)據(jù)的沖突與否進(jìn)行檢測。如果沖突則返回給用戶異常信息,讓用戶決定如何去做。樂觀鎖適用于讀多寫少的場景,這樣可以提高程序的吞吐量。在樂觀鎖實現(xiàn)時通常會基于記錄狀態(tài)或添加version版本來進(jìn)行實現(xiàn)。

悲觀鎖失效場景

項目中使用了悲觀鎖,但悲觀鎖卻失效了。這也是使用悲觀鎖時,常見的誤區(qū),下面來分析一下。

正常使用悲觀鎖的流程:

  • 通過select ... for update鎖定記錄;
  • 計算新余額,修改金額并存儲;
  • 執(zhí)行完成釋放鎖;

經(jīng)常犯錯的處理流程:

  • 查詢賬戶余額,計算新余額;
  • 通過select ... for update鎖定記錄;
  • 修改金額并存儲;
  • 執(zhí)行完成釋放鎖;

錯誤的流程中,比如A和B服務(wù)查詢到的余額都是100,A扣減50,B扣減40,然后A鎖定記錄,更新數(shù)據(jù)庫為50;A釋放鎖之后,B鎖定記錄,更新數(shù)據(jù)庫為60。顯然,后者把前者的更新給覆蓋掉了。解決的方案就是擴大鎖的范圍,將鎖提前到計算新余額之前。

通常悲觀鎖對數(shù)據(jù)庫的壓力是非常大的,在實踐中通常會根據(jù)場景使用樂觀鎖或分布式鎖等方式來實現(xiàn)。

下面進(jìn)入正題,講講基于Redis的分布式鎖實現(xiàn)。

Redis分布式鎖實戰(zhàn)演習(xí)

這里以Spring Boot、Redis、Lua腳本為例來演示分布式鎖的實現(xiàn)。為了簡化處理,示例中Redis既承擔(dān)了分布式鎖的功能,也承擔(dān)了數(shù)據(jù)庫的功能。

場景構(gòu)建

集群環(huán)境下,對同一個賬戶的金額進(jìn)行操作,基本步驟:

  • 從數(shù)據(jù)庫讀取用戶金額;
  • 程序修改金額;
  • 再將最新金額存儲到數(shù)據(jù)庫;

下面從最初不加鎖,不同步處理,逐步推演出最終的分布式鎖。

基礎(chǔ)集成及類構(gòu)建

準(zhǔn)備一個不加鎖處理的基礎(chǔ)業(yè)務(wù)環(huán)境。

首先在Spring Boot項目中引入相關(guān)依賴:

  
 
 
 
  1.  
  2.  org.springframework.boot 
  3.  spring-boot-starter-data-redis 
  4.  
  5.  
  6.  org.springframework.boot 
  7.  spring-boot-starter-web 
  8.  

賬戶對應(yīng)實體類UserAccount:

  
 
 
 
  1. public class UserAccount { 
  2.  
  3.  //用戶ID 
  4.  private String userId; 
  5.  //賬戶內(nèi)金額 
  6.  private int amount; 
  7.  
  8.  //添加賬戶金額 
  9.  public void addAmount(int amount) { 
  10.   this.amount = this.amount + amount; 
  11.  } 
  12.  // 省略構(gòu)造方法和getter/setter  

創(chuàng)建一個線程實現(xiàn)類AccountOperationThread:

  
 
 
 
  1. public class AccountOperationThread implements Runnable { 
  2.  
  3.  private final static Logger logger = LoggerFactory.getLogger(AccountOperationThread.class); 
  4.  
  5.  private static final Long RELEASE_SUCCESS = 1L; 
  6.  
  7.  private String userId; 
  8.  
  9.  private RedisTemplate redisTemplate; 
  10.  
  11.  public AccountOperationThread(String userId, RedisTemplate redisTemplate) { 
  12.   this.userId = userId; 
  13.   this.redisTemplate = redisTemplate; 
  14.  } 
  15.  
  16.  @Override 
  17.  public void run() { 
  18.   noLock(); 
  19.  } 
  20.  
  21.  /** 
  22.   * 不加鎖 
  23.   */ 
  24.  private void noLock() { 
  25.   try { 
  26.    Random random = new Random(); 
  27.    // 模擬線程進(jìn)行業(yè)務(wù)處理 
  28.    TimeUnit.MILLISECONDS.sleep(random.nextInt(100) + 1); 
  29.   } catch (InterruptedException e) { 
  30.    e.printStackTrace(); 
  31.   } 
  32.   //模擬數(shù)據(jù)庫中獲取用戶賬號 
  33.   UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId); 
  34.   // 金額+1 
  35.   userAccount.addAmount(1); 
  36.   logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount()); 
  37.   //模擬存回數(shù)據(jù)庫 
  38.   redisTemplate.opsForValue().set(userId, userAccount); 
  39.  } 

其中RedisTemplate的實例化交給了Spring Boot:

  
 
 
 
  1. @Configuration 
  2. public class RedisConfig { 
  3.  
  4.  @Bean 
  5.  public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) { 
  6.   RedisTemplate redisTemplate = new RedisTemplate<>(); 
  7.   redisTemplate.setConnectionFactory(redisConnectionFactory); 
  8.   Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = 
  9.     new Jackson2JsonRedisSerializer<>(Object.class); 
  10.   ObjectMapper objectMapper = new ObjectMapper(); 
  11.   objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); 
  12.   objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); 
  13.   jackson2JsonRedisSerializer.setObjectMapper(objectMapper); 
  14.   // 設(shè)置value的序列化規(guī)則和 key的序列化規(guī)則 
  15.   redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); 
  16.   redisTemplate.setKeySerializer(new StringRedisSerializer()); 
  17.   redisTemplate.afterPropertiesSet(); 
  18.   return redisTemplate; 
  19.  } 
  20. 最后,再準(zhǔn)備一個TestController來進(jìn)行觸發(fā)多線程的運行:

      
     
     
     
    1. @RestController 
    2. public class TestController { 
    3.  
    4.  private final static Logger logger = LoggerFactory.getLogger(TestController.class); 
    5.  
    6.  private static ExecutorService executorService = Executors.newFixedThreadPool(10); 
    7.  
    8.  @Autowired 
    9.  private RedisTemplate redisTemplate; 
    10.  
    11.  @GetMapping("/test") 
    12.  public String test() throws InterruptedException { 
    13.   // 初始化用戶user_001到Redis,賬戶金額為0 
    14.   redisTemplate.opsForValue().set("user_001", new UserAccount("user_001", 0)); 
    15.   // 開啟10個線程進(jìn)行同步測試,每個線程為賬戶增加1元 
    16.   for (int i = 0; i < 10; i++) { 
    17.    logger.info("創(chuàng)建線程i=" + i); 
    18.    executorService.execute(new AccountOperationThread("user_001", redisTemplate)); 
    19.   } 
    20.  
    21.   // 主線程休眠1秒等待線程跑完 
    22.   TimeUnit.MILLISECONDS.sleep(1000); 
    23.   // 查詢Redis中的user_001賬戶 
    24.   UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get("user_001"); 
    25.   logger.info("user id : " + userAccount.getUserId() + " amount : " + userAccount.getAmount()); 
    26.   return "success"; 
    27.  } 

    執(zhí)行上述程序,正常來說10個線程,每個線程加1,結(jié)果應(yīng)該是10。但多執(zhí)行幾次,會發(fā)現(xiàn),結(jié)果變化很大,基本上都要比10小。

      
     
     
     
    1. [pool-1-thread-5] c.s.redis.thread.AccountOperationThread  : pool-1-thread-5 : user id : user_001 amount : 1 
    2. [pool-1-thread-4] c.s.redis.thread.AccountOperationThread  : pool-1-thread-4 : user id : user_001 amount : 1 
    3. [pool-1-thread-3] c.s.redis.thread.AccountOperationThread  : pool-1-thread-3 : user id : user_001 amount : 1 
    4. [pool-1-thread-1] c.s.redis.thread.AccountOperationThread  : pool-1-thread-1 : user id : user_001 amount : 1 
    5. [pool-1-thread-1] c.s.redis.thread.AccountOperationThread  : pool-1-thread-1 : user id : user_001 amount : 2 
    6. [pool-1-thread-2] c.s.redis.thread.AccountOperationThread  : pool-1-thread-2 : user id : user_001 amount : 2 
    7. [pool-1-thread-5] c.s.redis.thread.AccountOperationThread  : pool-1-thread-5 : user id : user_001 amount : 2 
    8. [pool-1-thread-4] c.s.redis.thread.AccountOperationThread  : pool-1-thread-4 : user id : user_001 amount : 3 
    9. [pool-1-thread-1] c.s.redis.thread.AccountOperationThread  : pool-1-thread-1 : user id : user_001 amount : 4 
    10. [pool-1-thread-3] c.s.redis.thread.AccountOperationThread  : pool-1-thread-3 : user id : user_001 amount : 5 
    11. [nio-8080-exec-1] c.s.redis.controller.TestController      : user id : user_001 amount : 5 

    以上述日志為例,前四個線程都將值改為1,也就是后面三個線程都將前面的修改進(jìn)行了覆蓋,導(dǎo)致最終結(jié)果不是10,只有5。這顯然是有問題的。

    Redis同步鎖實現(xiàn)

    針對上面的情況,在同一個JVM當(dāng)中,我們可以通過線程加鎖來完成。但在分布式環(huán)境下,JVM級別的鎖是沒辦法實現(xiàn)的,這里可以采用Redis同步鎖實現(xiàn)。

    基本思路:第一個線程進(jìn)入時,在Redis中進(jìn)記錄,當(dāng)后續(xù)線程過來請求時,判斷Redis是否存在該記錄,如果存在則說明處于鎖定狀態(tài),進(jìn)行等待或返回。如果不存在,則進(jìn)行后續(xù)業(yè)務(wù)處理。

      
     
     
     
    1.  /** 
    2.  * 1.搶占資源時判斷是否被鎖。 
    3.  * 2.如未鎖則搶占成功且加鎖,否則等待鎖釋放。 
    4.  * 3.業(yè)務(wù)完成后釋放鎖,讓給其它線程。 
    5.  * 

       

    6.  * 該方案并未解決同步問題,原因:線程獲得鎖和加鎖的過程,并非原子性操作,可能會導(dǎo)致線程A獲得鎖,還未加鎖時,線程B也獲得了鎖。 
    7.  */ 
    8. private void redisLock() { 
    9.  Random random = new Random(); 
    10.  try { 
    11.   TimeUnit.MILLISECONDS.sleep(random.nextInt(1000) + 1); 
    12.  } catch (InterruptedException e) { 
    13.   e.printStackTrace(); 
    14.  } 
    15.  while (true) { 
    16.   Object lock = redisTemplate.opsForValue().get(userId + ":syn"); 
    17.   if (lock == null) { 
    18.    // 獲得鎖 -> 加鎖 -> 跳出循環(huán) 
    19.    logger.info(Thread.currentThread().getName() + ":獲得鎖"); 
    20.    redisTemplate.opsForValue().set(userId + ":syn", "lock"); 
    21.    break; 
    22.   } 
    23.   try { 
    24.    // 等待500毫秒重試獲得鎖 
    25.    TimeUnit.MILLISECONDS.sleep(500); 
    26.   } catch (InterruptedException e) { 
    27.    e.printStackTrace(); 
    28.   } 
    29.  } 
    30.  try { 
    31.   //模擬數(shù)據(jù)庫中獲取用戶賬號 
    32.   UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId); 
    33.   if (userAccount != null) { 
    34.    //設(shè)置金額 
    35.    userAccount.addAmount(1); 
    36.    logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount()); 
    37.    //模擬存回數(shù)據(jù)庫 
    38.    redisTemplate.opsForValue().set(userId, userAccount); 
    39.   } 
    40.  } finally { 
    41.   //釋放鎖 
    42.   redisTemplate.delete(userId + ":syn"); 
    43.   logger.info(Thread.currentThread().getName() + ":釋放鎖"); 
    44.  } 

    在while代碼塊中,先判斷對應(yīng)用戶ID是否在Redis中存在,如果不存在,則進(jìn)行set加鎖,如果存在,則跳出循環(huán)繼續(xù)等待。

    上述代碼,看起來實現(xiàn)了加鎖的功能,但當(dāng)執(zhí)行程序時,會發(fā)現(xiàn)與未加鎖一樣,依舊存在并發(fā)問題。原因是:獲取鎖和加鎖的操作并不是原子的。比如兩個線程發(fā)現(xiàn)lock都是null,都進(jìn)行了加鎖,此時并發(fā)問題依舊存在。

    Redis原子性同步鎖

    針對上述問題,可將獲取鎖和加鎖的過程原子化處理?;趕pring-boot-data-redis提供的原子化API可以實現(xiàn):

      
     
     
     
    1. // 該方法使用了redis的指令:SETNX key value 
    2. // 1.key不存在,設(shè)置成功返回value,setIfAbsent返回true; 
    3. // 2.key存在,則設(shè)置失敗返回null,setIfAbsent返回false; 
    4. // 3.原子性操作; 
    5. Boolean setIfAbsent(K var1, V var2); 

    上述方法的原子化操作是對Redis的setnx命令的封裝,在Redis中setnx的使用如下實例:

      
     
     
     
    1. redis> SETNX mykey "Hello" 
    2. (integer) 1 
    3. redis> SETNX mykey "World" 
    4. (integer) 0 
    5. redis> GET mykey 
    6. "Hello" 

    第一次,設(shè)置mykey時,并不存在,則返回1,表示設(shè)置成功;第二次設(shè)置mykey時,已經(jīng)存在,則返回0,表示設(shè)置失敗。再次查詢mykey對應(yīng)的值,會發(fā)現(xiàn)依舊是第一次設(shè)置的值。也就是說redis的setnx保證了唯一的key只能被一個服務(wù)設(shè)置成功。

    理解了上述API及底層原理,來看看線程中的實現(xiàn)方法代碼如下:

      
     
     
     
    1. /** 
    2.   * 1.原子操作加鎖 
    3.   * 2.競爭線程循環(huán)重試獲得鎖 
    4.   * 3.業(yè)務(wù)完成釋放鎖 
    5.   */ 
    6.  private void atomicityRedisLock() { 
    7.   //Spring data redis 支持的原子性操作 
    8.   while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", "lock")) { 
    9.    try { 
    10.     // 等待100毫秒重試獲得鎖 
    11.     TimeUnit.MILLISECONDS.sleep(100); 
    12.    } catch (InterruptedException e) { 
    13.     e.printStackTrace(); 
    14.    } 
    15.   } 
    16.   logger.info(Thread.currentThread().getName() + ":獲得鎖"); 
    17.   try { 
    18.    //模擬數(shù)據(jù)庫中獲取用戶賬號 
    19.    UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId); 
    20.    if (userAccount != null) { 
    21.     //設(shè)置金額 
    22.     userAccount.addAmount(1); 
    23.     logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount()); 
    24.     //模擬存回數(shù)據(jù)庫 
    25.     redisTemplate.opsForValue().set(userId, userAccount); 
    26.    } 
    27.   } finally { 
    28.    //釋放鎖 
    29.    redisTemplate.delete(userId + ":syn"); 
    30.    logger.info(Thread.currentThread().getName() + ":釋放鎖"); 
    31.   } 
    32.  } 

    再次執(zhí)行代碼,會發(fā)現(xiàn)結(jié)果正確了,也就是說可以成功的對分布式線程進(jìn)行了加鎖。

    Redis分布式鎖的死鎖

    雖然上述代碼執(zhí)行結(jié)果沒問題,但如果應(yīng)用異常宕機,沒來得及執(zhí)行finally中釋放鎖的方法,那么其他線程則永遠(yuǎn)無法獲得這個鎖。

    此時可采用setIfAbsent的重載方法:

      
     
     
     
    1. Boolean setIfAbsent(K var1, V var2, long var3, TimeUnit var5); 

    基于該方法,可以設(shè)置鎖的過期時間。這樣即便獲得鎖的線程宕機,在Redis中數(shù)據(jù)過期之后,其他線程可正常獲得該鎖。

    示例代碼如下:

      
     
     
     
    1. private void atomicityAndExRedisLock() { 
    2.   try { 
    3.    //Spring data redis 支持的原子性操作,并設(shè)置5秒過期時間 
    4.    while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", 
    5.      System.currentTimeMillis() + 5000, 5000, TimeUnit.MILLISECONDS)) { 
    6.     // 等待100毫秒重試獲得鎖 
    7.     logger.info(Thread.currentThread().getName() + ":嘗試循環(huán)獲取鎖"); 
    8.     TimeUnit.MILLISECONDS.sleep(1000); 
    9.    } 
    10.    logger.info(Thread.currentThread().getName() + ":獲得鎖--------"); 
    11.    // 應(yīng)用在這里宕機,進(jìn)程退出,無法執(zhí)行 finally; 
    12.    Thread.currentThread().interrupt(); 
    13.    // 業(yè)務(wù)邏輯... 
    14.   } catch (InterruptedException e) { 
    15.    e.printStackTrace(); 
    16.   } finally { 
    17.    //釋放鎖 
    18.    if (!Thread.currentThread().isInterrupted()) { 
    19.     redisTemplate.delete(userId + ":syn"); 
    20.     logger.info(Thread.currentThread().getName() + ":釋放鎖"); 
    21.    } 
    22.   } 
    23.  } 

    業(yè)務(wù)超時及守護(hù)線程

    上面添加了Redis所的超時時間,看似解決了問題,但又引入了新的問題。

    比如,正常情況下線程A在5秒內(nèi)可正常處理完業(yè)務(wù),但偶發(fā)會出現(xiàn)超過5秒的情況。如果將超時時間設(shè)置為5秒,線程A獲得了鎖,但業(yè)務(wù)邏輯處理需要6秒。此時,線程A還在正常業(yè)務(wù)邏輯,線程B已經(jīng)獲得了鎖。當(dāng)線程A處理完時,有可能將線程B的鎖給釋放掉。

    在上述場景中有兩個問題點:

    • 第一,線程A和線程B可能會同時在執(zhí)行,存在并發(fā)問題。
    • 第二,線程A可能會把線程B的鎖給釋放掉,導(dǎo)致一系列的惡性循環(huán)。

    當(dāng)然,可以通過在Redis中設(shè)置value值來判斷鎖是屬于線程A還是線程B。但仔細(xì)分析會發(fā)現(xiàn),這個問題的本質(zhì)是因為線程A執(zhí)行業(yè)務(wù)邏輯耗時超出了鎖超時的時間。

    那么就有兩個解決方案了:

    • 第一,將超時時間設(shè)置的足夠長,確保業(yè)務(wù)代碼能夠在鎖釋放之前執(zhí)行完成;
    • 第二,為鎖添加守護(hù)線程,為將要過期釋放但未釋放的鎖增加時間;

    第一種方式需要全行大多數(shù)情況下業(yè)務(wù)邏輯的耗時,進(jìn)行超時時間的設(shè)定。

    第二種方式,可通過如下守護(hù)線程的方式來動態(tài)增加鎖超時時間。

      
     
     
     
    1. public class DaemonThread implements Runnable { 
    2.  private final static Logger logger = LoggerFactory.getLogger(DaemonThread.class); 
    3.  
    4.  // 是否需要守護(hù) 主線程關(guān)閉則結(jié)束守護(hù)線程 
    5.  private volatile boolean daemon = true; 
    6.  // 守護(hù)鎖 
    7.  private String lockKey; 
    8.  
    9.  private RedisTemplate redisTemplate; 
    10.  
    11.  public DaemonThread(String lockKey, RedisTemplate redisTemplate) { 
    12.   this.lockKey = lockKey; 
    13.   this.redisTemplate = redisTemplate; 
    14.  } 
    15.  
    16.  @Override 
    17.  public void run() { 
    18.   try { 
    19.    while (daemon) { 
    20.     long time = redisTemplate.getExpire(lockKey, TimeUnit.MILLISECONDS); 
    21.     // 剩余有效期小于1秒則續(xù)命 
    22.     if (time < 1000) { 
    23.      logger.info("守護(hù)進(jìn)程: " + Thread.currentThread().getName() + " 延長鎖時間 5000 毫秒"); 
    24.      redisTemplate.expire(lockKey, 5000, TimeUnit.MILLISECONDS); 
    25.     } 
    26.     TimeUnit.MILLISECONDS.sleep(300); 
    27.    } 
    28.    logger.info(" 守護(hù)進(jìn)程: " + Thread.currentThread().getName() + "關(guān)閉 "); 
    29.   } catch (InterruptedException e) { 
    30.    e.printStackTrace(); 
    31.   } 
    32.  } 
    33.  
    34.  // 主線程主動調(diào)用結(jié)束 
    35.  public void stop() { 
    36.   daemon = false; 
    37.  } 

    上述線程每隔300毫秒獲取一下Redis中鎖的超時時間,如果小于1秒,則延長5秒。當(dāng)主線程調(diào)用關(guān)閉時,守護(hù)線程也隨之關(guān)閉。

    主線程中相關(guān)代碼實現(xiàn):

      
     
     
     
    1. private void deamonRedisLock() { 
    2.   //守護(hù)線程 
    3.   DaemonThread daemonThread = null; 
    4.   //Spring data redis 支持的原子性操作,并設(shè)置5秒過期時間 
    5.   String uuid = UUID.randomUUID().toString(); 
    6.   String value = Thread.currentThread().getId() + ":" + uuid; 
    7.   try { 
    8.    while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", value, 5000, TimeUnit.MILLISECONDS)) { 
    9.     // 等待100毫秒重試獲得鎖 
    10.     logger.info(Thread.currentThread().getName() + ":嘗試循環(huán)獲取鎖"); 
    11.     TimeUnit.MILLISECONDS.sleep(1000); 
    12.    } 
    13.    logger.info(Thread.currentThread().getName() + ":獲得鎖----"); 
    14.    // 開啟守護(hù)線程 
    15.    daemonThread = new DaemonThread(userId + ":syn", redisTemplate); 
    16.    Thread thread = new Thread(daemonThread); 
    17.    thread.start(); 
    18.    // 業(yè)務(wù)邏輯執(zhí)行10秒... 
    19.    TimeUnit.MILLISECONDS.sleep(10000); 
    20.   } catch (InterruptedException e) { 
    21.    e.printStackTrace(); 
    22.   } finally { 
    23.    //釋放鎖 這里也需要原子操作,今后通過 Redis + Lua 講 
    24.    String result = (String) redisTemplate.opsForValue().get(userId + ":syn"); 
    25.    if (value.equals(result)) { 
    26.     redisTemplate.delete(userId + ":syn"); 
    27.     logger.info(Thread.currentThread().getName() + ":釋放鎖-----"); 
    28.    } 
    29.    //關(guān)閉守護(hù)線程 
    30.    if (daemonThread != null) { 
    31.     daemonThread.stop(); 
    32.    } 
    33.   } 
    34.  } 

    其中在獲得鎖之后,開啟守護(hù)線程,在finally中將守護(hù)線程關(guān)閉。

    基于Lua腳本的實現(xiàn)

    在上述邏輯中,我們是基于spring-boot-data-redis提供的原子化操作來保證鎖判斷和執(zhí)行的原子化的。在非Spring Boot項目中,則可以基于Lua腳本來實現(xiàn)。

    首先定義加鎖和解鎖的Lua腳本及對應(yīng)的DefaultRedisScript對象,在RedisConfig配置類中添加如下實例化代碼:

      
     
     
     
    1. @Configuration 
    2. public class RedisConfig { 
    3.  
    4.  //lock script 
    5.  private static final String LOCK_SCRIPT = " if redis.call('setnx',KEYS[1],ARGV[1]) == 1 " + 
    6.    " then redis.call('expire',KEYS[1],ARGV[2]) " + 
    7.    " return 1 " + 
    8.    " else return 0 end "; 
    9.  private static final String UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call" + 
    10.    "('del', KEYS[1]) else return 0 end"; 
    11.  
    12.  // ... 省略部分代碼 
    13.   
    14.  @Bean 
    15.  public DefaultRedisScript lockRedisScript() { 
    16.   DefaultRedisScript defaultRedisScript = new DefaultRedisScript<>(); 
    17.   defaultRedisScript.setResultType(Boolean.class); 
    18.   defaultRedisScript.setScriptText(LOCK_SCRIPT); 
    19.   return defaultRedisScript; 
    20.  } 
    21.  
    22.  @Bean 
    23.  public DefaultRedisScript unlockRedisScript() { 
    24.   DefaultRedisScript defaultRedisScript = new DefaultRedisScript<>(); 
    25.   defaultRedisScript.setResultType(Long.class); 
    26.   defaultRedisScript.setScriptText(UNLOCK_SCRIPT); 
    27.   return defaultRedisScript; 
    28.  } 

    再通過在AccountOperationThread類中新建構(gòu)造方法,將上述兩個對象傳入類中(省略此部分演示)。然后,就可以基于RedisTemplate來調(diào)用了,改造之后的代碼實現(xiàn)如下:

      
     
     
     
    1. private void deamonRedisLockWithLua() { 
    2.  //守護(hù)線程 
    3.  DaemonThread daemonThread = null; 
    4.  //Spring data redis 支持的原子性操作,并設(shè)置5秒過期時間 
    5.  String uuid = UUID.randomUUID().toString(); 
    6.  String value = Thread.currentThread().getId() + ":" + uuid; 
    7.  try { 
    8.   while (!redisTemplate.execute(lockRedisScript, Collections.singletonList(userId + ":syn"), value, 5)) { 
    9.    // 等待1000毫秒重試獲得鎖 
    10.    logger.info(Thread.currentThread().getName() + ":嘗試循環(huán)獲取鎖"); 
    11.    TimeUnit.MILLISECONDS.sleep(1000); 
    12.   } 
    13.   logger.info(Thread.currentThread().getName() + ":獲得鎖----"); 
    14.   // 開啟守護(hù)線程 
    15.   daemonThread = new DaemonThread(userId + ":syn", redisTemplate); 
    16.   Thread thread = new Thread(daemonThread); 
    17.   thread.start(); 
    18.   // 業(yè)務(wù)邏輯執(zhí)行10秒... 
    19.   TimeUnit.MILLISECONDS.sleep(10000); 
    20.  } catch (InterruptedException e) { 
    21.   logger.error("異常", e); 
    22.  } finally { 
    23.   //使用Lua腳本:先判斷是否是自己設(shè)置的鎖,再執(zhí)行刪除 
    24.   // key存在,當(dāng)前值=期望值時,刪除key;key存在,當(dāng)前值!=期望值時,返回0; 
    25.   Long result = redisTemplate.execute(unlockRedisScript, Collections.singletonList(userId + ":syn"), value); 
    26.   logger.info("redis解鎖:{}", RELEASE_SUCCESS.equals(result)); 
    27.   if (RELEASE_SUCCESS.equals(result)) { 
    28.    if (daemonThread != null) { 
    29.     //關(guān)閉守護(hù)線程 
    30.     daemonThread.stop(); 
    31.     logger.info(Thread.currentThread().getName() + ":釋放鎖---"); 
    32.    } 
    33.   } 
    34.  } 

    其中while循環(huán)中加鎖和finally中的釋放鎖都是基于Lua腳本來實現(xiàn)了。

    Redis鎖的其他因素

    除了上述實例,在使用Redis分布式鎖時,還可以考慮以下情況及方案。

    Redis鎖的不可重入

    當(dāng)線程在持有鎖的情況下再次請求加鎖,如果一個鎖支持一個線程多次加鎖,那么這個鎖就是可重入的。如果一個不可重入鎖被再次加鎖,由于該鎖已經(jīng)被持有,再次加鎖會失敗。Redis可通過對鎖進(jìn)行重入計數(shù),加鎖時加 1,解鎖時減 1,當(dāng)計數(shù)歸 0時釋放鎖。

    可重入鎖雖然高效但會增加代碼的復(fù)雜性,這里就不舉例說明了。

    等待鎖釋放

    有的業(yè)務(wù)場景,發(fā)現(xiàn)被鎖則直接返回。但有的場景下,客戶端需要等待鎖釋放然后去搶鎖。上述示例就屬于后者。針對等待鎖釋放也有兩種方案:

    • 客戶端輪訓(xùn):當(dāng)未獲得鎖時,等待一段時間再重新獲取,直到成功。上述示例就是基于這種方式實現(xiàn)的。這種方式的缺點也很明顯,比較耗費服務(wù)器資源,當(dāng)并發(fā)量大時會影響服務(wù)器的效率。
    • 使用Redis的訂閱發(fā)布功能:當(dāng)獲取鎖失敗時,訂閱鎖釋放消息,獲取鎖成功后釋放時,發(fā)送釋放消息。

    集群中的主備切換和腦裂

    在Redis包含主從同步的集群部署方式中,如果主節(jié)點掛掉,從節(jié)點提升為主節(jié)點。如果客戶端A在主節(jié)點加鎖成功,指令還未同步到從節(jié)點,此時主節(jié)點掛掉,從節(jié)點升為主節(jié)點,新的主節(jié)點中沒有鎖的數(shù)據(jù)。這種情況下,客戶端B就可能加鎖成功,從而出現(xiàn)并發(fā)的場景。

    當(dāng)集群發(fā)生腦裂時,Redis master節(jié)點跟slave 節(jié)點和 sentinel 集群處于不同的網(wǎng)絡(luò)分區(qū)。sentinel集群無法感知到master的存在,會將 slave 節(jié)點提升為 master 節(jié)點,此時就會存在兩個不同的 master 節(jié)點。從而也會導(dǎo)致并發(fā)問題的出現(xiàn)。Redis Cluster集群部署方式同理。

    小結(jié)

    通過生產(chǎn)環(huán)境中的一個問題,排查原因,尋找解決方案,到最終對基于Redis分布式的深入研究,這便是學(xué)習(xí)的過程。

    同時,每當(dāng)面試或被問題如何解決分布式共享資源時,我們會脫口而出”基于Redis實現(xiàn)分布式鎖“,但通過本文的學(xué)習(xí)會發(fā)現(xiàn),Redis分布式鎖并不是萬能的,而且在使用的過程中還需要注意超時、死鎖、誤解鎖、集群選主/腦裂等問題。

    Redis以高性能著稱,但在實現(xiàn)分布式鎖的過程中還是存在一些問題。因此,基于Redis的分布式鎖可以極大的緩解并發(fā)問題,但要完全防止并發(fā),還是得從數(shù)據(jù)庫層面入手。


    網(wǎng)站欄目:加鎖了還有并發(fā)問題?Redis分布式鎖,真的用對了?
    轉(zhuǎn)載注明:http://www.5511xx.com/article/cocgphs.html