ZK(ZooKeeper)分布式鎖實現
準備
本文會使用到 三臺 獨立服務器,可以自行提前搭建好。
不知道如何搭建的,可以看我之前 ZooKeeper集群 搭建:Zookeeper 集群部署的那些事兒
關于ZooKeeper 一些基礎命令可以看這篇:Zookeeper入門看這篇就夠了
前言
在平時我們對鎖的使用,在針對單個服務,我們可以用 Java 自帶的一些鎖來實現,資源的順序訪問,但是隨著業務的發展,現在基本上公司的服務都是多個,單純的 Lock或者Synchronize 只能解決單個JVM線程的問題,那么針對于單個服務的 Java 的鎖是無法滿足我們業務的需要的,為了解決多個服務跨服務訪問共享資源,于是就有了分布鎖,分布式鎖產生的原因就是集群。
正文
實現分布式鎖的方式有哪些呢?
- 分布式鎖的實現方式主要以(ZooKeeper、Reids、Mysql)這三種為主
今天我們主要講解的是使用 ZooKeeper來實現分布式鎖,ZooKeeper的應用場景主要包含這幾個方面:
- 服務注冊與訂閱(共用節點)
- 分布式通知(監聽ZNode)
- 服務命令(ZNode特性)
- 數據訂閱、發布(Watcher)
- 分布式鎖(臨時節點)
ZooKeeper實現分布式鎖,主要是得益于ZooKeeper 保證了數據的強一致性,鎖的服務可以分為兩大類:
保持獨占
所有試圖來獲取當前鎖的客戶端,最終有且只有一個能夠成功得到當前鎖的鑰匙,通常我們會把 ZooKeeper 上的節點(ZNode)看做一把鎖,通過 create臨時節點的方式來實現,當多個客戶端都去創建一把鎖的時候,那么只有成功創建了那個客戶端才能擁有這把鎖
控制時序
所有試圖獲取鎖的客戶端,都是被順序執行,只是會有一個序號(zxid),我們會有一個節點,例如:/testLock,所有臨時節點都在這個下面去創建,ZK的父節點(/testLock) 維持了一個序號,這個是ZK自帶的屬性,他保證了子節點創建的時序性,從而也形成了每個客戶端的一個 全局時序
ZK鎖機制
在實現ZooKeeper 分布式鎖之前我們有必要了解一下,關于ZooKeeper分布式鎖機制的實現流程和原理,不然各位寶貝,出去面試的時候怎么和面試官侃侃而談~
臨時順序節點
基于ZooKeeper的臨時順序節點 ,ZooKeeper比較適合來實現分布式鎖:
- 順序發號器: ZooKeeper的每一個節點,都是自帶順序生成器:在每個節點下面創建臨時節點,新的子節點后面,會添加一個次序編號,這個生成的編號,會在上一次的編號進行 +1 操作
- 有序遞增: ZooKeeper節點有序遞增,可以保證鎖的公平性,我們只需要在一個持久父節點下,創建對應的臨時順序節點,每個線程在嘗試占用鎖之前,會調用watch,判斷自己當前的序號是不是在當前父節點最小,如果是,那么獲取鎖
- Znode監聽: 每個線程在搶占所之前,會創建屬于當前線程的ZNode節點,在釋放鎖的時候,會刪除創建的ZNode,當我們創建的序號不是最小的時候,會等待watch通知,也就是上一個ZNode的狀態通知,當前一個ZNode刪除的時候,會觸發回調機制,告訴下一個ZNode,你可以獲取鎖開始工作了
- 臨時節點自動刪除: ZooKeeper還有一個好處,當我們客戶端斷開連接之后,我們出創建的臨時節點會進行自動刪除操作,所以我們在使用分布式鎖的時候,一般都是會去創建臨時節點,這樣可以避免因為網絡異常等原因,造成的死鎖。
- 羊群效應: ZooKeeper節點的順序訪問性,后面監聽前面的方式,可以有效的避免 羊群效應,什么是羊群效應:當某一個節點掛掉了,所有的節點都要去監聽,然后做出回應,這樣會給服務器帶來比較大壓力,如果有了臨時順序節點,當一個節點掛掉了,只有它后面的那一個節點才做出反應。
我們現在看一下下面一張圖:
在上圖中, ZooKeeper里面有一把鎖節點 testLock,這個鎖就是 ZooKeeper的一個節點,當兩個客戶端來獲取這把鎖的時候,會對 ZooKeeper進行加鎖的請求,也就是我們所說的 臨時順序節點。
當我們在 /testLock目錄下創建了一個順序臨時節點后,ZK會自動對這個臨時節點維護 一個節點序號,并且這個節點是遞增的,比如我們 clientA 創建了一個臨時順序節點,ZK內部會生成一個序號:/lock0000000001,那么 clientB 也生成了一個臨時順序節點,ZK會生成一個序號為 /lock0000000002,在這里數字都是依次遞增的,從1開始遞增,ZK內部會維護這個順序。
下圖所示:
這時候,ClientA會進行監聽判斷,在父節點下,我是不是最小的,如果是的話,那么俺就可以加鎖了,因為我是最小的,其他的都比我大。我自己可以進行加鎖,你已經是一個成熟的臨時節點了,要學會自己加鎖。咳,那么ZK是怎么進行判斷的呢?寶貝,您往下看:
這個是 cleintA已經加鎖完成了,這個時候 clientB也要過來加鎖,那么他也要在 /testLock,創建一個屬于自己的臨時節點,那么這個時候他的序號就會變成 /lock0000000002,如下圖所示:
這個時候就會出現我們前面所講的,clientB 在加鎖的時候會判斷,自己是不是最小的,一看在當前父節點下不是最小的,啊~我還挺大的,還有比我小的!!!
加鎖失敗呀,咳咳,這個時候呢,clientB 就會去偷窺clientA,氣氛逐漸曖昧起來,啊不是,是按照順序去監聽前一個節點(clientA),是否完成工作了,如果完成了,clientB才可以進行加鎖工作,寶貝,你往下看圖片:
clientA 加鎖成功后,會進行自己的業務處理,當 clientA 處理完工作后,說我完事了,下一個,那么 clientA 是怎么完事的呢,他多長時間?不是,具體流程是怎樣的?小農你不對勁,說什么呢!!!真羞澀
上面我們不是說了,當 clientB 加鎖失敗后,會給前一個節點(clientA)加上一個監聽,當clientA被刪除以后,就表示有人釋放了鎖,這個時候就會通知 clientB重新去獲取鎖。
這個時候clientB重新獲取鎖的時候,發現自己就是當前父節點下面最小的那個,于是clientB就開始加鎖,開始工作等一系列操作,當clientB 完事以后,釋放鎖,也說了一句,下一個。
如下圖所示:
當然除了 clientA、clientB還有C\D\E等,這字母看著好奇怪又好熟悉,原理都是一樣的,都是最小節點進行解鎖,如果不是,監聽前一個節點是否釋放,如果釋放了,再次嘗試加鎖。如果前一節節點釋放了,自己就是最小了,就排到前面去了,有點類似于 銀行取號 的操作。
代碼實現
使用ZooKeeper 創建臨時順序節點來實現分布式鎖,大體的流程就是 先創建一個持久父節點,在當前節點下,創建臨時順序節點,找出最小的序列號,獲取分布式鎖,程序業務完成之后釋放鎖,通知下一個節點進行操作,使用的是watch來監控節點的變化,然后依次下一個最小序列節點進行操作。
首先我們需要創建一個持久父類節點:我這里是 /mxn
WatchCallBack
- import org.apache.zookeeper.*;
- import org.apache.zookeeper.data.Stat;
- import java.util.Collections;
- import java.util.List;
- import java.util.concurrent.CountDownLatch;
- /**
- * @program: mxnzookeeper
- * @ClassName WatchCallBack
- * @description:
- * @author: 微信搜索:牧小農
- * @create: 2021-10-23 10:48
- * @Version 1.0
- **/
- public class WatchCallBack implements Watcher, AsyncCallback.StringCallback ,AsyncCallback.Children2Callback ,AsyncCallback.StatCallback {
- ZooKeeper zk ;
- String threadName;
- CountDownLatch cc = new CountDownLatch(1);
- String pathName;
- public String getPathName() {
- return pathName;
- }
- public void setPathName(String pathName) {
- this.pathName = pathName;
- }
- public String getThreadName() {
- return threadName;
- }
- public void setThreadName(String threadName) {
- this.threadName = threadName;
- }
- public ZooKeeper getZk() {
- return zk;
- }
- public void setZk(ZooKeeper zk) {
- this.zk = zk;
- }
- /** @Author 牧小農
- * @Description //TODO 嘗試加鎖方法
- * @Date 16:14 2021/10/24
- * @Param
- * @return
- **/
- public void tryLock(){
- try {
- System.out.println(threadName + " 開始創建。。。。");
- //創建一個順序臨時節點
- zk.create("/lock",threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,this,"abc");
- //阻塞當前,監聽前一個節點是否釋放鎖
- cc.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- /** @Author 牧小農
- * @Description //TODO 解鎖方法
- * @Date 16:14 2021/10/24
- * @Param
- * @return
- **/
- public void unLock(){
- try {
- //釋放鎖,刪除臨時節點
- zk.delete(pathName,-1);
- //結束工作
- System.out.println(threadName + " 結束工作了....");
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (KeeperException e) {
- e.printStackTrace();
- }
- }
- @Override
- public void process(WatchedEvent event) {
- //如果第一個節點釋放了鎖,那么第二個就會收到回調
- //告訴它前一個節點釋放了,你可以開始嘗試獲取鎖
- switch (event.getType()) {
- case None:
- break;
- case NodeCreated:
- break;
- case NodeDeleted:
- //當前節點重新獲取鎖
- zk.getChildren("/",false,this ,"sdf");
- break;
- case NodeDataChanged:
- break;
- case NodeChildrenChanged:
- break;
- }
- }
- @Override
- public void processResult(int rc, String path, Object ctx, String name) {
- if(name != null ){
- System.out.println(threadName +" 線程創建了一個節點為 : " + name );
- pathName = name ;
- //監聽前一個節點
- zk.getChildren("/",false,this ,"sdf");
- }
- }
- //getChildren call back
- @Override
- public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
- //節點按照編號,升序排列
- Collections.sort(children);
- //對節點進行截取例如 /lock0000000022 截取后就是 lock0000000022
- int i = children.indexOf(pathName.substring(1));
- //是不是第一個,也就是說是不是最小的
- if(i == 0){
- //是第一個
- System.out.println(threadName +" 現在我是最小的....");
- try {
- zk.setData("/",threadName.getBytes(),-1);
- cc.countDown();
- } catch (KeeperException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }else{
- //不是第一個
- //監聽前一個節點 看它是不是完成了工作進行釋放鎖了
- zk.exists("/"+children.get(i-1),this,this,"sdf");
- }
- }
- @Override
- public void processResult(int rc, String path, Object ctx, Stat stat) {
- //判斷是否失敗exists
- }
- }
TestLock
- import com.mxn.zookeeper.config.ZKUtils;
- import org.apache.zookeeper.ZooKeeper;
- import org.junit.After;
- import org.junit.Before;
- import org.junit.Test;
- /**
- * @program: mxnzookeeper
- * @ClassName TestLock
- * @description:
- * @author: 微信搜索:牧小農
- * @create: 2021-10-23 10:45
- * @Version 1.0
- **/
- public class TestLock {
- ZooKeeper zk ;
- @Before
- public void conn (){
- zk = ZKUtils.getZK();
- }
- @After
- public void close (){
- try {
- zk.close();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- @Test
- public void lock(){
- //創建十個線程
- for (int i = 0; i < 10; i++) {
- new Thread(){
- @Override
- public void run() {
- WatchCallBack watchCallBack = new WatchCallBack();
- watchCallBack.setZk(zk);
- String threadName = Thread.currentThread().getName();
- watchCallBack.setThreadName(threadName);
- //線程進行搶鎖操作
- watchCallBack.tryLock();
- try {
- //進行業務邏輯處理
- System.out.println(threadName+" 開始處理業務邏輯了...");
- Thread.sleep(200);
- }catch (Exception e){
- e.printStackTrace();
- }
- //釋放鎖
- watchCallBack.unLock();
- }
- }.start();
- }
- while(true){
- }
- }
- }
運行結果:
- Thread-1 線程創建了一個節點為 : /lock0000000112
- Thread-5 線程創建了一個節點為 : /lock0000000113
- Thread-2 線程創建了一個節點為 : /lock0000000114
- Thread-6 線程創建了一個節點為 : /lock0000000115
- Thread-9 線程創建了一個節點為 : /lock0000000116
- Thread-4 線程創建了一個節點為 : /lock0000000117
- Thread-7 線程創建了一個節點為 : /lock0000000118
- Thread-3 線程創建了一個節點為 : /lock0000000119
- Thread-8 線程創建了一個節點為 : /lock0000000120
- Thread-0 線程創建了一個節點為 : /lock0000000121
- Thread-1 現在我是最小的....
- Thread-1 開始處理業務邏輯了...
- Thread-1 結束工作了....
- Thread-5 現在我是最小的....
- Thread-5 開始處理業務邏輯了...
- Thread-5 結束工作了....
- Thread-2 現在我是最小的....
- Thread-2 開始處理業務邏輯了...
- Thread-2 結束工作了....
- Thread-6 現在我是最小的....
- Thread-6 開始處理業務邏輯了...
- Thread-6 結束工作了....
- Thread-9 現在我是最小的....
- Thread-9 開始處理業務邏輯了...
- Thread-9 結束工作了....
- Thread-4 現在我是最小的....
- Thread-4 開始處理業務邏輯了...
- Thread-4 結束工作了....
- Thread-7 現在我是最小的....
- Thread-7 開始處理業務邏輯了...
- Thread-7 結束工作了....
- Thread-3 現在我是最小的....
- Thread-3 開始處理業務邏輯了...
- Thread-3 結束工作了....
- Thread-8 現在我是最小的....
- Thread-8 開始處理業務邏輯了...
- Thread-8 結束工作了....
- Thread-0 現在我是最小的....
- Thread-0 開始處理業務邏輯了...
- Thread-0 結束工作了....
總結
ZK分布式鎖,能夠有效的解決分布式、不可重入的問題,在上面的案例中我, 沒有實現可重入鎖,但是實現起來也不麻煩,只需要帶上線程信息等唯一標識,判斷一下就可以了
ZK實現分布式鎖具有天然的優勢,臨時順序節點,可以有效的避免死鎖問題,讓客戶端斷開,那么就會刪除當前臨時節點,讓下一個節點進行工作。
如果文中有錯誤或者不了解的地方,歡迎留言,小農看見了會第一時間回復大家,大家加油
我是牧小農,一個卑微的打工人,如果覺得文中的內容對你有幫助,記得一鍵三連啊,你們的三連是小農最大的動力。