基于Consul的分布式鎖實現
我們在構建分布式系統的時候,經常需要控制對共享資源的互斥訪問。這個時候我們就涉及到分布式鎖(也稱為全局鎖)的實現,基于目前的各種工具,我們已經有了大量的實現方式,比如:基于Redis的實現、基于Zookeeper的實現。本文將介紹一種基于Consul 的Key/Value存儲來實現分布式鎖以及信號量的方法。
分布式鎖實現
基于Consul的分布式鎖主要利用Key/Value存儲API中的acquire和release操作來實現。acquire和release操作是類似Check-And-Set的操作:
- acquire操作只有當鎖不存在持有者時才會返回true,并且set設置的Value值,同時執行操作的session會持有對該Key的鎖,否則就返回false
- release操作則是使用指定的session來釋放某個Key的鎖,如果指定的session無效,那么會返回false,否則就會set設置Value值,并返回true
具體實現中主要使用了這幾個Key/Value的API:
create session:https://www.consul.io/api/session.html#session_create
delete session:https://www.consul.io/api/session.html#delete-session
KV acquire/release:https://www.consul.io/api/kv.html#create-update-key
基本流程
具體實現
- public class Lock {
- private static final String prefix = "lock/"; // 同步鎖參數前綴
- private ConsulClient consulClient;
- private String sessionName;
- private String sessionId = null;
- private String lockKey;
- /**
- *
- * @param consulClient
- * @param sessionName 同步鎖的session名稱
- * @param lockKey 同步鎖在consul的KV存儲中的Key路徑,會自動增加prefix前綴,方便歸類查詢
- */
- public Lock(ConsulClient consulClient, String sessionName, String lockKey) {
- this.consulClient = consulClient;
- this.sessionName = sessionName;
- this.lockKey = prefix + lockKey;
- }
- /**
- * 獲取同步鎖
- *
- * @param block 是否阻塞,直到獲取到鎖為止
- * @return
- */
- public Boolean lock(boolean block) {
- if (sessionId != null) {
- throw new RuntimeException(sessionId + " - Already locked!");
- }
- sessionId = createSession(sessionName);
- while(true) {
- PutParams putParams = new PutParams();
- putParams.setAcquireSession(sessionId);
- if(consulClient.setKVValue(lockKey, "lock:" + LocalDateTime.now(), putParams).getValue()) {
- return true;
- } else if(block) {
- continue;
- } else {
- return false;
- }
- }
- }
- /**
- * 釋放同步鎖
- *
- * @return
- */
- public Boolean unlock() {
- PutParams putParams = new PutParams();
- putParams.setReleaseSession(sessionId);
- boolean result = consulClient.setKVValue(lockKey, "unlock:" + LocalDateTime.now(), putParams).getValue();
- consulClient.sessionDestroy(sessionId, null);
- return result;
- }
- /**
- * 創建session
- * @param sessionName
- * @return
- */
- private String createSession(String sessionName) {
- NewSession newSession = new NewSession();
- newSession.setName(sessionName);
- return consulClient.sessionCreate(newSession, null).getValue();
- }
- }
單元測試
- public class TestLock {
- private Logger logger = Logger.getLogger(getClass());
- @Test
- public void testLock() throws Exception {
- new Thread(new LockRunner(1)).start();
- new Thread(new LockRunner(2)).start();
- new Thread(new LockRunner(3)).start();
- new Thread(new LockRunner(4)).start();
- new Thread(new LockRunner(5)).start();
- Thread.sleep(200000L);
- }
- class LockRunner implements Runnable {
- private Logger logger = Logger.getLogger(getClass());
- private int flag;
- public LockRunner(int flag) {
- this.flag = flag;
- }
- @Override
- public void run() {
- Lock lock = new Lock(new ConsulClient(), "lock-session", "lock-key");
- try {
- if (lock.lock(true)) {
- logger.info("Thread " + flag + " start!");
- Thread.sleep(new Random().nextInt(3000L));
- logger.info("Thread " + flag + " end!");
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- lock.unlock();
- }
- }
- }
- }
優化建議
本文我們實現了基于Consul的簡單分布式鎖,但是在實際運行時,可能會因為各種各樣的意外情況導致unlock操作沒有得到正確地執行,從而使得分布式鎖無法釋放。所以為了更完善的使用分布式鎖,我們還必須實現對鎖的超時清理等控制,保證即使出現了未正常解鎖的情況下也能自動修復,以提升系統的健壯性。那么如何實現呢?請持續關注我的后續分解!
【本文為51CTO專欄作者“翟永超”的原創稿件,轉載請通過51CTO聯系作者獲取授權】