日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢(xún)
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷(xiāo)解決方案
分布式鎖中-基于Zookeeper的實(shí)現(xiàn)

1. Zookeeper概述

Zookeeper(后續(xù)簡(jiǎn)稱(chēng)ZK)是一個(gè)分布式的,開(kāi)放源碼的分布式應(yīng)用程序協(xié)調(diào)服務(wù),通常以集群模式運(yùn)轉(zhuǎn),其協(xié)調(diào)能力可以理解為是基于觀察者設(shè)計(jì)模式來(lái)實(shí)現(xiàn)的;ZK服務(wù)會(huì)使用Znode存儲(chǔ)使用者的數(shù)據(jù),并將這些數(shù)據(jù)以樹(shù)形目錄的形式來(lái)組織管理,支持使用者以觀察者的角色指定自己關(guān)注哪些節(jié)點(diǎn)\數(shù)據(jù)的變更,當(dāng)這些變更發(fā)生時(shí),ZK會(huì)通知其觀察者;為滿(mǎn)足本篇目標(biāo)所需,著重介紹以下幾個(gè)關(guān)鍵特性:

創(chuàng)新互聯(lián)是專(zhuān)業(yè)的鳳岡網(wǎng)站建設(shè)公司,鳳岡接單;提供成都網(wǎng)站建設(shè)、網(wǎng)站建設(shè),網(wǎng)頁(yè)設(shè)計(jì),網(wǎng)站設(shè)計(jì),建網(wǎng)站,PHP網(wǎng)站建設(shè)等專(zhuān)業(yè)做網(wǎng)站服務(wù);采用PHP框架,可快速的進(jìn)行鳳岡網(wǎng)站開(kāi)發(fā)網(wǎng)頁(yè)制作和功能擴(kuò)展;專(zhuān)業(yè)做搜索引擎喜愛(ài)的網(wǎng)站,專(zhuān)業(yè)的做網(wǎng)站團(tuán)隊(duì),希望更多企業(yè)前來(lái)合作!

  • 數(shù)據(jù)組織:數(shù)據(jù)節(jié)點(diǎn)以樹(shù)形目錄(類(lèi)似文件系統(tǒng))組織管理,每一個(gè)節(jié)點(diǎn)中都會(huì)保存數(shù)據(jù)信息和節(jié)點(diǎn)信息。

ZooKeeper's Hierarchical Namespace

  • 集群模式:通常是由3、5個(gè)基數(shù)實(shí)例組成集群,當(dāng)超過(guò)半數(shù)服務(wù)實(shí)例正常工作就能對(duì)外提供服務(wù),既能避免單點(diǎn)故障,又盡量高可用,每個(gè)服務(wù)實(shí)例都有一個(gè)數(shù)據(jù)備份,以實(shí)現(xiàn)數(shù)據(jù)全局一致

ZooKeeper Service

  • 順序更新:更新請(qǐng)求都會(huì)轉(zhuǎn)由leader執(zhí)行,來(lái)自同一客戶(hù)端的更新將按照發(fā)送的順序被寫(xiě)入到ZK,處理寫(xiě)請(qǐng)求創(chuàng)建Znode時(shí),Znode名稱(chēng)后會(huì)被分配一個(gè)全局唯一的遞增編號(hào),可以通過(guò)順序號(hào)推斷請(qǐng)求的順序,利用這個(gè)特性可以實(shí)現(xiàn)高級(jí)協(xié)調(diào)服務(wù)

監(jiān)聽(tīng)機(jī)制:給某個(gè)節(jié)點(diǎn)注冊(cè)監(jiān)聽(tīng)器,該節(jié)點(diǎn)一旦發(fā)生變更(例如更新或者刪除),監(jiān)聽(tīng)者就會(huì)收到一個(gè)Watch Event,可以感知到節(jié)點(diǎn)\數(shù)據(jù)的變更

臨時(shí)節(jié)點(diǎn):session鏈接斷開(kāi)臨時(shí)節(jié)點(diǎn)就沒(méi)了,不能創(chuàng)建子節(jié)點(diǎn)(很關(guān)鍵)

ZK的分布式鎖正是基于以上特性來(lái)實(shí)現(xiàn)的,簡(jiǎn)單來(lái)說(shuō)是:

  • 臨時(shí)節(jié)點(diǎn):用于支撐異常情況下的鎖自動(dòng)釋放能力
  • 順序節(jié)點(diǎn):用于支撐公平鎖獲取鎖和排隊(duì)等待的能力
  • 監(jiān)聽(tīng)機(jī)制:用于支撐搶鎖能力
  • 集群模式:用于支撐鎖服務(wù)的高可用

2. 加解鎖的流程描述

  • 創(chuàng)建一個(gè)永久節(jié)點(diǎn)作為鎖節(jié)點(diǎn)(/lock2)
  • 試圖加鎖的客戶(hù)端在指定鎖名稱(chēng)節(jié)點(diǎn)(/lock2)下,創(chuàng)建臨時(shí)順序子節(jié)點(diǎn)
  • 獲取鎖節(jié)點(diǎn)(/lock2)下所有子節(jié)點(diǎn)
  • 對(duì)所獲取的子節(jié)點(diǎn)按節(jié)點(diǎn)自增序號(hào)從小到大排序
  • 判斷自己是不是第一個(gè)子節(jié)點(diǎn),若是,則獲取鎖
  • 若不是,則監(jiān)聽(tīng)比該節(jié)點(diǎn)小的那個(gè)節(jié)點(diǎn)的刪除事件(這種只監(jiān)聽(tīng)前一個(gè)節(jié)點(diǎn)的方式避免了驚群效應(yīng))
  • 若是阻塞申請(qǐng)鎖,則申請(qǐng)鎖的操作可增加阻塞等待
  • 若監(jiān)聽(tīng)事件生效(說(shuō)明前節(jié)點(diǎn)釋放了,可以嘗試去獲取鎖),則回到第3步重新進(jìn)行判斷,直到獲取到鎖
  • 解鎖時(shí),將第一個(gè)子節(jié)點(diǎn)刪除釋放

3. ZK分布式鎖的能力

可能讀者是單篇閱讀,這里引入上一篇《分布式鎖上-初探》中的一些內(nèi)容,一個(gè)分布式鎖應(yīng)具備這樣一些功能特點(diǎn):

  • 互斥性:在同一時(shí)刻,只有一個(gè)客戶(hù)端能持有鎖
  • 安全性:避免死鎖,如果某個(gè)客戶(hù)端獲得鎖之后處理時(shí)間超過(guò)最大約定時(shí)間,或者持鎖期間發(fā)生了故障導(dǎo)致無(wú)法主動(dòng)釋放鎖,其持有的鎖也能夠被其他機(jī)制正確釋放,并保證后續(xù)其它客戶(hù)端也能加鎖,整個(gè)處理流程繼續(xù)正常執(zhí)行
  • 可用性:也被稱(chēng)作容錯(cuò)性,分布式鎖需要有高可用能力,避免單點(diǎn)故障,當(dāng)提供鎖的服務(wù)節(jié)點(diǎn)故障(宕機(jī))時(shí)不影響服務(wù)運(yùn)行,這里有兩種模式:一種是分布式鎖服務(wù)自身具備集群模式,遇到故障能自動(dòng)切換恢復(fù)工作;另一種是客戶(hù)端向多個(gè)獨(dú)立的鎖服務(wù)發(fā)起請(qǐng)求,當(dāng)某個(gè)鎖服務(wù)故障時(shí)仍然可以從其他鎖服務(wù)讀取到鎖信息(Redlock)
  • 可重入性:對(duì)同一個(gè)鎖,加鎖和解鎖必須是同一個(gè)線程,即不能把其他線程程持有的鎖給釋放了
  • 高效靈活:加鎖、解鎖的速度要快;支持阻塞和非阻塞;支持公平鎖和非公平鎖

基于上文的內(nèi)容,這里簡(jiǎn)單總結(jié)一下ZK的能力矩陣(其它分布式鎖的情況會(huì)在后續(xù)文章中補(bǔ)充):

能力

ZK

MySql

Redis原生

Redlock

ETCD

互斥

安全

鏈接異常,session關(guān)閉后鎖會(huì)自動(dòng)釋放

可用性

相對(duì)還好

可重入

線程可重入

加解鎖速度

居中

阻塞非阻塞

都支持

公平非公平

僅公平鎖

關(guān)于性能不太高的一種說(shuō)法

因?yàn)槊看卧趧?chuàng)建鎖和釋放鎖的過(guò)程中,都要?jiǎng)討B(tài)創(chuàng)建、銷(xiāo)毀臨時(shí)節(jié)點(diǎn)來(lái)實(shí)現(xiàn)鎖功能。ZK中創(chuàng)建和刪除節(jié)點(diǎn)只能通過(guò)Leader服務(wù)器來(lái)執(zhí)行,然后Leader服務(wù)器還需要將數(shù)據(jù)同步到所有的Follower機(jī)器上,這樣頻繁的網(wǎng)絡(luò)通信,性能的短板是非常突出的。在高性能,高并發(fā)的場(chǎng)景下,不建議使用ZooKeeper的分布式鎖。

由于ZooKeeper的高可用特性,在并發(fā)量不是太高的場(chǎng)景,也推薦使用ZK的分布式鎖。

4. InterProcessMutex 使用示例

Zookeeper 客戶(hù)端框架 Curator 提供的 InterProcessMutex 是分布式鎖的一種實(shí)現(xiàn),acquire 方法阻塞|非阻塞獲取鎖,release 方法釋放鎖,另外還提供了可撤銷(xiāo)、可重入功能。

4.1 接口介紹

// 獲取互斥鎖
public void acquire() throws Exception;
// 在給定的時(shí)間內(nèi)獲取互斥鎖
public boolean acquire(long time, TimeUnit unit) throws Exception;
// 釋放鎖處理
public void release() throws Exception;
// 如果當(dāng)前線程獲取了互斥鎖,則返回true
boolean isAcquiredInThisProcess();

4.2 pom依賴(lài)


org.apache.logging.log4j
log4j-core
2.8.2


org.apache.zookeeper
zookeeper
3.5.7


org.apache.curator
curator-framework
4.3.0


org.apache.curator
curator-recipes
4.3.0


org.apache.curator
curator-client
4.3.0

4.3 示例

package com.atguigu.case3;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorLockTest {

public static void main(String[] args) {

// 創(chuàng)建分布式鎖1
InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");

// 創(chuàng)建分布式鎖2
InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");

new Thread(new Runnable() {
@Override
public void run() {
try {
lock1.acquire();
System.out.println("線程1 獲取到鎖");

lock1.acquire();
System.out.println("線程1 再次獲取到鎖");

Thread.sleep(5 * 1000);

lock1.release();
System.out.println("線程1 釋放鎖");

lock1.release();
System.out.println("線程1 再次釋放鎖");

} catch (Exception e) {
e.printStackTrace();
}
}
}).start();

new Thread(new Runnable() {
@Override
public void run() {
try {
lock2.acquire();
System.out.println("線程2 獲取到鎖");

lock2.acquire();
System.out.println("線程2 再次獲取到鎖");

Thread.sleep(5 * 1000);

lock2.release();
System.out.println("線程2 釋放鎖");

lock2.release();
System.out.println("線程2 再次釋放鎖");

} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}

private static CuratorFramework getCuratorFramework() {

ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);

CuratorFramework client = CuratorFrameworkFactory.builder().connectString("xxx:2181,xxx:2181,xxx:2181")
.connectionTimeoutMs(2000)
.sessionTimeoutMs(2000)
.retryPolicy(policy).build();

// 啟動(dòng)客戶(hù)端
client.start();

System.out.println("zookeeper 啟動(dòng)成功");
return client;
}
}

5. DIY一個(gè)閹割版的分布式鎖

通過(guò)這個(gè)實(shí)例對(duì)照第2節(jié)內(nèi)容來(lái)理解加解鎖的流程,以及如何避免驚群效應(yīng)。

package com.rock.case2;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
* zk 分布式鎖 v1版本:
* 完成功能 :
* 1. 避免了驚群效應(yīng)
* 缺失功能:
* 1. 超時(shí)控制
* 2. 讀寫(xiě)鎖
* 3. 重入控制
*/
public class DistributedLock {

private String connectString;
private int sessionTimeout;
private ZooKeeper zk;

private CountDownLatch connectLatch = new CountDownLatch(1);
private CountDownLatch waitLatch = new CountDownLatch(1);

private String waitPath;
private String currentNode;
private String LOCK_ROOT_PATH;

private static String NODE_PREFIX = "w";

public DistributedLock(String connectString, int sessionTimeout, String lockName){
//TODO:數(shù)據(jù)校驗(yàn)
this.connectString = connectString;
this.sessionTimeout = sessionTimeout;
this.LOCK_ROOT_PATH = lockName;
}


public void init() throws IOException, KeeperException, InterruptedException {
// 建聯(lián)
zk = new ZooKeeper(connectString, sessionTimeout, watchedEvent -> {
// connectLatch 連接上zk后 釋放
if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
connectLatch.countDown();
}
});

connectLatch.await();// 等待zk正常連接后

// 判斷鎖名稱(chēng)節(jié)點(diǎn)是否存在
Stat stat = zk.exists(LOCK_ROOT_PATH, false);
if (stat == null) {
// 創(chuàng)建一下鎖名稱(chēng)節(jié)點(diǎn)
try {
zk.create(LOCK_ROOT_PATH, LOCK_ROOT_PATH.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (KeeperException e) {
//并發(fā)創(chuàng)建沖突忽略。
if (!e.code().name().equals("NODEEXISTS")) {
throw e;
}
}
}
}

/**
* 待補(bǔ)充功能:
* 1. 超時(shí)設(shè)置
* 2. 讀寫(xiě)區(qū)分
* 3. 重入控制
*/
public void zklock() throws KeeperException, InterruptedException {
if (!tryLock()) {
waitLock();
zklock();
}
}

/**
*
*/
private void waitLock() throws KeeperException, InterruptedException {
try {
zk.getData(waitPath, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent){
// waitLatch 需要釋放
if (watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
waitLatch.countDown();
}
}
}, new Stat());
// 等待監(jiān)聽(tīng)
waitLatch.await();
} catch (KeeperException.NoNodeException e) {
//如果等待的節(jié)點(diǎn)已經(jīng)被清除了,不等了,再?lài)L試去搶鎖
return;
}

}

private boolean tryLock() throws KeeperException, InterruptedException {

currentNode = zk.create(LOCK_ROOT_PATH + "/" + NODE_PREFIX, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 判斷創(chuàng)建的節(jié)點(diǎn)是否是最小的序號(hào)節(jié)點(diǎn),如果是獲取到鎖;如果不是,監(jiān)聽(tīng)他序號(hào)前一個(gè)節(jié)點(diǎn)
List children = zk.getChildren(LOCK_ROOT_PATH, false);
// 如果children 只有一個(gè)值,那就直接獲取鎖; 如果有多個(gè)節(jié)點(diǎn),需要判斷,誰(shuí)最小
if (children.size() == 1) {
return true;
} else {
String thisNode = currentNode.substring(LOCK_ROOT_PATH.length() + 1);
// 通過(guò)w00000000獲取該節(jié)點(diǎn)在children集合的位置
int index = children.indexOf(thisNode);
if (index == 0) {
//自己就是第一個(gè)節(jié)點(diǎn)
return true;
}
// 需要監(jiān)聽(tīng) 他前一個(gè)節(jié)點(diǎn)變化
waitPath = LOCK_ROOT_PATH + "/" + children.get(index - 1);
}
return false;
}


// 解鎖
public void unZkLock(){
// 刪除節(jié)點(diǎn)
try {
zk.delete(this.currentNode, -1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}

}

本文轉(zhuǎn)載自微信公眾號(hào)「架構(gòu)染色」,可以通過(guò)以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系【架構(gòu)染色】公眾號(hào)作者。


分享標(biāo)題:分布式鎖中-基于Zookeeper的實(shí)現(xiàn)
本文URL:http://www.5511xx.com/article/cdjjped.html