成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

從Curator實現(xiàn)分布式鎖的源碼再到羊群效應(yīng)

開發(fā) 前端 分布式
Curator是一款由Java編寫的,操作Zookeeper的客戶端工具,在其內(nèi)部封裝了分布式鎖、選舉等高級功能。

一、前言

Curator是一款由Java編寫的,操作Zookeeper的客戶端工具,在其內(nèi)部封裝了分布式鎖、選舉等高級功能。

今天主要是分析其實現(xiàn)分布式鎖的主要原理,有關(guān)分布式鎖的一些介紹或其他實現(xiàn),有興趣的同學(xué)可以翻閱以下文章:

我用了上萬字,走了一遍Redis實現(xiàn)分布式鎖的坎坷之路,從單機到主從再到多實例,原來會發(fā)生這么多的問題_陽陽的博客-CSDN博客

Redisson可重入與鎖續(xù)期源碼分析_陽陽的博客-CSDN博客

在使用Curator獲取分布式鎖時,Curator會在指定的path下創(chuàng)建一個有序的臨時節(jié)點,如果該節(jié)點是最小的,則代表獲取鎖成功。

接下來,在準備工作中,我們可以觀察是否會創(chuàng)建出一個臨時節(jié)點出來。

二、準備工作

首先我們需要搭建一個zookeeper集群,當(dāng)然你使用單機也行。

在這篇文章面試官:能給我畫個Zookeeper選舉的圖嗎?,介紹了一種使用docker-compose方式快速搭建zk集群的方式。

在pom中引入依賴:

  1. <dependency> 
  2.          <groupId>org.apache.curator</groupId> 
  3.          <artifactId>curator-recipes</artifactId> 
  4.          <version>2.12.0</version> 
  5.      </dependency> 

 Curator客戶端的配置項:

  1. /** 
  2.  * @author qcy 
  3.  * @create 2022/01/01 22:59:34 
  4.  */ 
  5. @Configuration 
  6. public class CuratorFrameworkConfig { 
  7.  
  8.     //zk各節(jié)點地址 
  9.     private static final String CONNECT_STRING = "localhost:2181,localhost:2182,localhost:2183"
  10.     //連接超時時間(單位:毫秒) 
  11.     private static final int CONNECTION_TIME_OUT_MS = 10 * 1000; 
  12.     //會話超時時間(單位:毫秒) 
  13.     private static final int SESSION_TIME_OUT_MS = 30 * 1000; 
  14.     //重試的初始等待時間(單位:毫秒) 
  15.     private static final int BASE_SLEEP_TIME_MS = 2 * 1000; 
  16.     //最大重試次數(shù) 
  17.     private static final int MAX_RETRIES = 3; 
  18.  
  19.     @Bean 
  20.     public CuratorFramework getCuratorFramework() { 
  21.         CuratorFramework curatorFramework = CuratorFrameworkFactory.builder() 
  22.                 .connectString(CONNECT_STRING) 
  23.                 .connectionTimeoutMs(CONNECTION_TIME_OUT_MS) 
  24.                 .sessionTimeoutMs(SESSION_TIME_OUT_MS) 
  25.                 .retryPolicy(new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES)) 
  26.                 .build(); 
  27.         curatorFramework.start(); 
  28.         return curatorFramework; 
  29.     } 
  30.      

 SESSION_TIME_OUT_MS參數(shù)則會保證,在某個客戶端獲取到鎖之后突然宕機,zk能在該時間內(nèi)刪除當(dāng)前客戶端創(chuàng)建的臨時有序節(jié)點。

測試代碼如下:

  1. //臨時節(jié)點路徑,qcy是博主名字縮寫哈 
  2.    private static final String LOCK_PATH = "/lockqcy"
  3.  
  4.    @Resource 
  5.    CuratorFramework curatorFramework; 
  6.  
  7.    public void testCurator() throws Exception { 
  8.        InterProcessMutex interProcessMutex = new InterProcessMutex(curatorFramework, LOCK_PATH); 
  9.        interProcessMutex.acquire(); 
  10.  
  11.        try { 
  12.            //模擬業(yè)務(wù)耗時 
  13.            Thread.sleep(30 * 1000); 
  14.        } catch (Exception e) { 
  15.            e.printStackTrace(); 
  16.        } finally { 
  17.            interProcessMutex.release(); 
  18.        } 
  19.    } 

 當(dāng)使用接口調(diào)用該方法時,在Thread.sleep處打上斷點,進入到zk容器中觀察創(chuàng)建出來的節(jié)點。

使用 docker exec -it zk容器名 /bin/bash 以交互模式進入容器,接著使用 ./bin/zkCli.sh 連接到zk的server端。

然后使用 ls path 查看節(jié)點

這三個節(jié)點都是持久節(jié)點,可以使用 get path 查看節(jié)點的數(shù)據(jù)結(jié)構(gòu)信息

若一個節(jié)點的ephemeralOwner值為0,即該節(jié)點的臨時擁有者的會話id為0,則代表該節(jié)點為持久節(jié)點。

當(dāng)走到斷點Thread.sleep時,確實發(fā)現(xiàn)在lockqcy下創(chuàng)建出來一個臨時節(jié)點

​到這里嗎,準備工作已經(jīng)做完了,接下來分析interProcessMutex.acquire與release的流程

三、源碼分析

Curator支持多種類型的鎖,例如

  • InterProcessMutex,可重入鎖排它鎖
  • InterProcessReadWriteLock,讀寫鎖
  • InterProcessSemaphoreMutex,不可重入排它鎖

今天主要是分析InterProcessMutex的加解鎖過程,先看加鎖過程

加鎖

  1. public void acquire() throws Exception { 
  2.       if (!internalLock(-1, null)) { 
  3.           throw new IOException("Lost connection while trying to acquire lock: " + basePath); 
  4.       } 
  5.   } 

 這里是阻塞式獲取鎖,獲取不到鎖,就一直進行阻塞。所以對于internalLock方法,超時時間設(shè)置為-1,時間單位設(shè)置成null。

  1. private boolean internalLock(long time, TimeUnit unit) throws Exception { 
  2.        Thread currentThread = Thread.currentThread(); 
  3.        //通過能否在map中取到該線程的LockData信息,來判斷該線程是否已經(jīng)持有鎖 
  4.        LockData lockData = threadData.get(currentThread); 
  5.        if (lockData != null) { 
  6.            //進行可重入,直接返回加鎖成功 
  7.            lockData.lockCount.incrementAndGet(); 
  8.            return true
  9.        } 
  10.        //進行加鎖 
  11.        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes()); 
  12.        if (lockPath != null) { 
  13.            //加鎖成功,保存到map中 
  14.            LockData newLockData = new LockData(currentThread, lockPath); 
  15.            threadData.put(currentThread, newLockData); 
  16.            return true
  17.        } 
  18.  
  19.        return false
  20.    } 

其中threadData是一個map,key線程對象,value為該線程綁定的鎖數(shù)據(jù)。

LockData中保存了加鎖線程owningThread,重入計數(shù)lockCount與加鎖路徑lockPath,例如

  1. /lockqcy/_c_c46513c3-ace0-405f-aa1e-a531ce28fb47-lock-0000000005 
  1. private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap(); 
  2.  
  3.     private static class LockData { 
  4.         final Thread owningThread; 
  5.         final String lockPath; 
  6.         final AtomicInteger lockCount = new AtomicInteger(1); 
  7.  
  8.         private LockData(Thread owningThread, String lockPath) { 
  9.             this.owningThread = owningThread; 
  10.             this.lockPath = lockPath; 
  11.         } 
  12.     } 

 進入到internals.attemptLock方法中

  1. String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception { 
  2.       //開始時間 
  3.       final long startMillis = System.currentTimeMillis(); 
  4.       //將超時時間統(tǒng)一轉(zhuǎn)化為毫秒單位 
  5.       final Long millisToWait = (unit != null) ? unit.toMillis(time) : null
  6.       //節(jié)點數(shù)據(jù),這里為null 
  7.       final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes; 
  8.       //重試次數(shù) 
  9.       int retryCount = 0; 
  10.       //鎖路徑 
  11.       String ourPath = null
  12.       //是否獲取到鎖 
  13.       boolean hasTheLock = false
  14.       //是否完成 
  15.       boolean isDone = false
  16.  
  17.       while (!isDone) { 
  18.           isDone = true
  19.  
  20.           try { 
  21.               //創(chuàng)建一個臨時有序節(jié)點,并返回節(jié)點路徑 
  22.               //內(nèi)部調(diào)用client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path); 
  23.               ourPath = driver.createsTheLock(client, path, localLockNodeBytes); 
  24.               //依據(jù)返回的節(jié)點路徑,判斷是否搶到了鎖 
  25.               hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath); 
  26.           } catch (KeeperException.NoNodeException e) { 
  27.               //在會話過期時,可能導(dǎo)致driver找不到臨時有序節(jié)點,從而拋出NoNodeException 
  28.               //這里就進行重試 
  29.               if (client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) { 
  30.                   isDone = false
  31.               } else { 
  32.                   throw e; 
  33.               } 
  34.           } 
  35.       } 
  36.       //獲取到鎖,則返回節(jié)點路徑,供調(diào)用方記錄到map中 
  37.       if (hasTheLock) { 
  38.           return ourPath; 
  39.       } 
  40.  
  41.       return null
  42.   } 

 接下來,將會在internalLockLoop中利用剛才創(chuàng)建出來的臨時有序節(jié)點,判斷是否獲取到了鎖。

  1. private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception { 
  2.        //是否獲取到鎖 
  3.        boolean haveTheLock = false
  4.        boolean doDelete = false
  5.        try { 
  6.            if (revocable.get() != null) { 
  7.                //當(dāng)前不會進入這里 
  8.                client.getData().usingWatcher(revocableWatcher).forPath(ourPath); 
  9.            } 
  10.            //一直嘗試獲取鎖 
  11.            while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) { 
  12.                //返回basePath(這里是lockqcy)下所有的臨時有序節(jié)點,并且按照后綴從小到大排列 
  13.                List<String> children = getSortedChildren(); 
  14.                //取出當(dāng)前線程創(chuàng)建出來的臨時有序節(jié)點的名稱,這里就是/_c_c46513c3-ace0-405f-aa1e-a531ce28fb47-lock-0000000005 
  15.                String sequenceNodeName = ourPath.substring(basePath.length() + 1); 
  16.                //判斷當(dāng)前節(jié)點是否處于排序后的首位,如果處于首位,則代表獲取到了鎖 
  17.                PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases); 
  18.                if (predicateResults.getsTheLock()) { 
  19.                    //獲取到鎖之后,則終止循環(huán) 
  20.                    haveTheLock = true
  21.                } else { 
  22.                    //這里代表沒有獲取到鎖 
  23.                    //獲取比當(dāng)前節(jié)點索引小的前一個節(jié)點 
  24.                    String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch(); 
  25.  
  26.                    synchronized (this) { 
  27.                        try { 
  28.                            //如果前一個節(jié)點不存在,則直接拋出NoNodeException,catch中不進行處理,在下一輪中繼續(xù)獲取鎖 
  29.                            //如果前一個節(jié)點存在,則給它設(shè)置一個監(jiān)聽器,監(jiān)聽它的釋放事件 
  30.                            client.getData().usingWatcher(watcher).forPath(previousSequencePath); 
  31.                            if (millisToWait != null) { 
  32.                                millisToWait -= (System.currentTimeMillis() - startMillis); 
  33.                                startMillis = System.currentTimeMillis(); 
  34.                                //判斷是否超時 
  35.                                if (millisToWait <= 0) { 
  36.                                    //獲取鎖超時,刪除剛才創(chuàng)建的臨時有序節(jié)點 
  37.                                    doDelete = true
  38.                                    break; 
  39.                                } 
  40.                                //沒超時的話,在millisToWait內(nèi)進行等待 
  41.                                wait(millisToWait); 
  42.                            } else { 
  43.                                //無限期阻塞等待,監(jiān)聽到前一個節(jié)點被刪除時,才會觸發(fā)喚醒操作 
  44.                                wait(); 
  45.                            } 
  46.                        } catch (KeeperException.NoNodeException e) { 
  47.                            //如果前一個節(jié)點不存在,則直接拋出NoNodeException,catch中不進行處理,在下一輪中繼續(xù)獲取鎖 
  48.                        } 
  49.                    } 
  50.                } 
  51.            } 
  52.        } catch (Exception e) { 
  53.            ThreadUtils.checkInterrupted(e); 
  54.            doDelete = true
  55.            throw e; 
  56.        } finally { 
  57.            if (doDelete) { 
  58.                //刪除剛才創(chuàng)建出來的臨時有序節(jié)點 
  59.                deleteOurPath(ourPath); 
  60.            } 
  61.        } 
  62.        return haveTheLock; 
  63.    } 

 判斷是否獲取到鎖的核心邏輯位于getsTheLock中

  1. public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception { 
  2.      //獲取當(dāng)前節(jié)點在所有子節(jié)點排序后的索引位置 
  3.      int ourIndex = children.indexOf(sequenceNodeName); 
  4.      //判斷當(dāng)前節(jié)點是否處于子節(jié)點中 
  5.      validateOurIndex(sequenceNodeName, ourIndex); 
  6.      //InterProcessMutex的構(gòu)造方法,會將maxLeases初始化為1 
  7.      //ourIndex必須為0,才能使得getsTheLock為true,也就是說,當(dāng)前節(jié)點必須是basePath下的最小節(jié)點,才能代表獲取到了鎖 
  8.      boolean getsTheLock = ourIndex < maxLeases; 
  9.      //如果獲取不到鎖,則返回上一個節(jié)點的名稱,用作對其設(shè)置監(jiān)聽 
  10.      String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases); 
  11.  
  12.      return new PredicateResults(pathToWatch, getsTheLock); 
  13.  } 
  14.  
  15.  static void validateOurIndex(String sequenceNodeName, int ourIndex) throws KeeperException { 
  16.      if (ourIndex < 0) { 
  17.          //可能會由于連接丟失導(dǎo)致臨時節(jié)點被刪除,因此這里屬于保險措施 
  18.          throw new KeeperException.NoNodeException("Sequential path not found: " + sequenceNodeName); 
  19.      } 
  20.  } 

 那什么時候,在internalLockLoop處于wait的線程能被喚醒呢?

在internalLockLoop方法中,已經(jīng)使用

  1. client.getData().usingWatcher(watcher).forPath(previousSequencePath); 

給前一個節(jié)點設(shè)置了監(jiān)聽器,當(dāng)該節(jié)點被刪除時,將會觸發(fā)watcher中的回調(diào)

  1. private final Watcher watcher = new Watcher() { 
  2.         //回調(diào)方法 
  3.         @Override 
  4.         public void process(WatchedEvent event) { 
  5.             notifyFromWatcher(); 
  6.         } 
  7.     }; 
  8.  
  9.     private synchronized void notifyFromWatcher() { 
  10.         //喚醒所以在LockInternals實例上等待的線程 
  11.         notifyAll(); 
  12.     } 

 到這里,基本上已經(jīng)分析完加鎖的過程了,在這里總結(jié)下:

首先創(chuàng)建一個臨時有序節(jié)點

如果該節(jié)點是basePath下最小節(jié)點,則代表獲取到了鎖,存入map中,下次直接進行重入。

如果該節(jié)點不是最小節(jié)點,則對前一個節(jié)點設(shè)置監(jiān)聽,接著進行wait等待。當(dāng)前一個節(jié)點被刪除時,將會通知notify該線程。

解鎖

解鎖的邏輯,就比較簡單了,直接進入release方法中

  1. public void release() throws Exception { 
  2.       Thread currentThread = Thread.currentThread(); 
  3.       LockData lockData = threadData.get(currentThread); 
  4.       if (lockData == null) { 
  5.           throw new IllegalMonitorStateException("You do not own the lock: " + basePath); 
  6.       } 
  7.  
  8.       int newLockCount = lockData.lockCount.decrementAndGet(); 
  9.       //直接減少一次重入次數(shù) 
  10.       if (newLockCount > 0) { 
  11.           return
  12.       } 
  13.       if (newLockCount < 0) { 
  14.           throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath); 
  15.       } 
  16.  
  17.       //到這里代表重入次數(shù)為0 
  18.       try { 
  19.           //釋放鎖 
  20.           internals.releaseLock(lockData.lockPath); 
  21.       } finally { 
  22.           //從map中移除 
  23.           threadData.remove(currentThread); 
  24.       } 
  25.   } 
  26.  
  27.   void releaseLock(String lockPath) throws Exception { 
  28.       revocable.set(null); 
  29.       //內(nèi)部使用guaranteed,會在后臺不斷嘗試刪除節(jié)點 
  30.       deleteOurPath(lockPath); 
  31.   } 

 重入次數(shù)大于0,就減少重入次數(shù)。當(dāng)減為0時,調(diào)用zk去刪除節(jié)點,這一點和Redisson可重入鎖釋放時一致。

四、羊群效應(yīng)

在這里談?wù)勈褂肸ookeeper實現(xiàn)分布式鎖場景中的羊群效應(yīng)

什么是羊群效應(yīng)

首先,羊群是一種很散亂的組織,漫無目的,缺少管理,一般需要牧羊犬來幫助主人控制羊群。

某個時候,當(dāng)其中一只羊發(fā)現(xiàn)前面有更加美味的草而動起來,就會導(dǎo)致其余的羊一哄而上,根本不管周圍的情況。

所以羊群效應(yīng),指的是一個人在進行理性的行為后,導(dǎo)致其余人直接盲從,產(chǎn)生非理性的從眾行為。

而Zookeeper中的羊群效應(yīng),則是指一個znode被改變后,觸發(fā)了大量本可以被避免的watch通知,造成集群資源的浪費。

獲取不到鎖時的等待演化

sleep一段時間

如果某個線程在獲取鎖失敗后,完全可以sleep一段時間,再嘗試獲取鎖。

但這樣的方式,效率極低。

sleep時間短的話,會頻繁地進行輪詢,浪費資源。

sleep時間長的話,會出現(xiàn)鎖被釋放但仍然獲取不到鎖的尷尬情況。

所以,這里的優(yōu)化點,在于如何變主動輪詢?yōu)楫惒酵ㄖ?/p>

watch被鎖住的節(jié)點

所有的客戶端要獲取鎖時,只去創(chuàng)建一個同名的node。

當(dāng)znode存在時,這些客戶端對其設(shè)置監(jiān)聽。當(dāng)znode被刪除后,通知所有等待鎖的客戶端,接著這些客戶端再次嘗試獲取鎖。

雖然這里使用watch機制來異步通知,可是當(dāng)客戶端的數(shù)量特別多時,會存在性能低點。

當(dāng)znode被刪除后,在這一瞬間,需要給大量的客戶端發(fā)送通知。在此期間,其余提交給zk的正常請求可能會被延遲或者阻塞。

這就產(chǎn)生了羊群效應(yīng),一個點的變化(znode被刪除),造成了全面的影響(通知大量的客戶端)。

所以,這里的優(yōu)化點,在于如何減少對一個znode的監(jiān)聽數(shù)量,最好的情況是只有一個。

watch前一個有序節(jié)點

如果先指定一個basePath,想要獲取鎖的客戶端,直接在該路徑下創(chuàng)建臨時有序節(jié)點。

當(dāng)創(chuàng)建的節(jié)點是最小節(jié)點時,代表獲取到了鎖。如果不是最小的節(jié)點,則只對前一個節(jié)點設(shè)置監(jiān)聽器,只監(jiān)聽前一個節(jié)點的刪除行為。

這樣前一個節(jié)點被刪除時,只會給下一個節(jié)點代表的客戶端發(fā)送通知,不會給所有客戶端發(fā)送通知,從而避免了羊群效應(yīng)。

​在避免羊群效應(yīng)的同時,使得當(dāng)前鎖成為公平鎖。即按照申請鎖的先后順序獲得鎖,避免存在饑餓過度的線程。

五、后語

本文從源碼角度講解了使用Curator獲取分布式鎖的流程,接著從等待鎖的演化過程角度出發(fā),分析了Zookeeper在分布式鎖場景下避免羊群效應(yīng)的解決方案。

這是Zookeeper系列的第二篇,關(guān)于其watch原理分析、zab協(xié)議等文章也在安排的路上了。

 

責(zé)任編輯:姜華 來源: 今日頭條
相關(guān)推薦

2021-07-16 07:57:34

ZooKeeperCurator源碼

2021-07-08 09:21:17

ZooKeeper分布式鎖 Curator

2021-07-10 10:02:30

ZooKeeperCurator并發(fā)

2021-07-09 06:48:31

ZooKeeperCurator源碼

2021-11-26 06:43:19

Java分布式

2024-11-28 15:11:28

2021-07-06 08:37:29

Redisson分布式

2019-06-19 15:40:06

分布式鎖RedisJava

2021-02-28 07:49:28

Zookeeper分布式

2018-04-03 16:24:34

分布式方式

2017-01-16 14:13:37

分布式數(shù)據(jù)庫

2017-04-13 10:51:09

Consul分布式

2022-04-08 08:27:08

分布式鎖系統(tǒng)

2020-10-19 07:30:57

Java Redis 開發(fā)

2019-02-26 09:51:52

分布式鎖RedisZookeeper

2022-01-06 10:58:07

Redis數(shù)據(jù)分布式鎖

2023-08-21 19:10:34

Redis分布式

2021-10-25 10:21:59

ZK分布式鎖ZooKeeper

2021-07-02 08:51:09

Redisson分布式鎖公平鎖

2021-06-30 14:56:12

Redisson分布式公平鎖
點贊
收藏

51CTO技術(shù)棧公眾號

主站蜘蛛池模板: 国产日韩欧美激情 | 国产精品成人在线播放 | 国产精品久久久久久久午夜 | 91亚洲精品久久久电影 | 日韩福利在线观看 | 日韩精品一区二区三区在线观看 | 农村黄性色生活片 | 午夜日韩 | 中文字幕日本一区二区 | 欧美久久一级特黄毛片 | 欧美成人精品激情在线观看 | 在线视频久久 | 亚洲人成人一区二区在线观看 | 国产欧美一区二区三区久久 | 久久久久久国产精品三区 | 国产高清视频一区二区 | 亚洲天堂中文字幕 | 欧美不卡一区二区三区 | 青草福利| 国产欧美久久精品 | 羞羞视频在线观看 | 91精品国产99 | 欧美在线天堂 | 日韩成人一区 | 久久一本| 激情欧美日韩一区二区 | 第一区在线观看免费国语入口 | 欧美理伦片在线播放 | 亚洲 中文 欧美 日韩 在线观看 | 久久亚洲美女 | 狠狠躁躁夜夜躁波多野结依 | 日韩第一页 | 欧美日韩在线一区二区三区 | 一区二区免费视频 | 最新毛片网站 | 亚洲精品片 | 极情综合网 | 国产精品久久av | 久久久久国 | 成人免费视频网站在线看 | 伊人久久综合 |