基于Redis的分布式鎖和Redlock算法
本文轉(zhuǎn)載自微信公眾號(hào)「UP技術(shù)控」,作者conan5566。轉(zhuǎn)載本文請(qǐng)聯(lián)系UP技術(shù)控公眾號(hào)。
在單進(jìn)程的系統(tǒng)中,當(dāng)存在多個(gè)線程可以同時(shí)改變某個(gè)變量(可變共享變量)時(shí),就需要對(duì)變量或代碼塊做同步,使其在修改這種變量時(shí)能夠線性執(zhí)行消除并發(fā)修改變量。
而同步的本質(zhì)是通過(guò)鎖來(lái)實(shí)現(xiàn)的。為了實(shí)現(xiàn)多個(gè)線程在一個(gè)時(shí)刻同一個(gè)代碼塊只能有一個(gè)線程可執(zhí)行,那么需要在某個(gè)地方做個(gè)標(biāo)記,這個(gè)標(biāo)記必須每個(gè)線程都能看到,當(dāng)標(biāo)記不存在時(shí)可以設(shè)置該標(biāo)記,其余后續(xù)線程發(fā)現(xiàn)已經(jīng)有標(biāo)記了則等待擁有標(biāo)記的線程結(jié)束同步代碼塊取消標(biāo)記后再去嘗試設(shè)置標(biāo)記。這個(gè)標(biāo)記可以理解為鎖。
不同地方實(shí)現(xiàn)鎖的方式也不一樣,只要能滿足所有線程都能看得到標(biāo)記即可。如 Java 中 synchronize 是在對(duì)象頭設(shè)置標(biāo)記,Lock 接口的實(shí)現(xiàn)類基本上都只是某一個(gè) volitile 修飾的 int 型變量其保證每個(gè)線程都能擁有對(duì)該 int 的可見(jiàn)性和原子修改,linux 內(nèi)核中也是利用互斥量或信號(hào)量等內(nèi)存數(shù)據(jù)做標(biāo)記。
除了利用內(nèi)存數(shù)據(jù)做鎖其實(shí)任何互斥的都能做鎖(只考慮互斥情況),如流水表中流水號(hào)與時(shí)間結(jié)合做冪等校驗(yàn)可以看作是一個(gè)不會(huì)釋放的鎖,或者使用某個(gè)文件是否存在作為鎖等。只需要滿足在對(duì)標(biāo)記進(jìn)行修改能保證原子性和內(nèi)存可見(jiàn)性即可。
1 什么是分布式?
分布式的 CAP 理論告訴我們:
任何一個(gè)分布式系統(tǒng)都無(wú)法同時(shí)滿足一致性(Consistency)、可用性(Availability)和分區(qū)容錯(cuò)性(Partition tolerance),最多只能同時(shí)滿足兩項(xiàng)。
目前很多大型網(wǎng)站及應(yīng)用都是分布式部署的,分布式場(chǎng)景中的數(shù)據(jù)一致性問(wèn)題一直是一個(gè)比較重要的話題。基于 CAP理論,很多系統(tǒng)在設(shè)計(jì)之初就要對(duì)這三者做出取舍。在互聯(lián)網(wǎng)領(lǐng)域的絕大多數(shù)的場(chǎng)景中,都需要犧牲強(qiáng)一致性來(lái)?yè)Q取系統(tǒng)的高可用性,系統(tǒng)往往只需要保證最終一致性。
分布式場(chǎng)景
此處主要指集群模式下,多個(gè)相同服務(wù)同時(shí)開(kāi)啟.
在許多的場(chǎng)景中,我們?yōu)榱吮WC數(shù)據(jù)的最終一致性,需要很多的技術(shù)方案來(lái)支持,比如分布式事務(wù)、分布式鎖等。很多時(shí)候我們需要保證一個(gè)方法在同一時(shí)間內(nèi)只能被同一個(gè)線程執(zhí)行。在單機(jī)環(huán)境中,通過(guò) Java 提供的并發(fā) API 我們可以解決,但是在分布式環(huán)境下,就沒(méi)有那么簡(jiǎn)單啦。
- 分布式與單機(jī)情況下最大的不同在于其不是多線程而是多進(jìn)程。
- 多線程由于可以共享堆內(nèi)存,因此可以簡(jiǎn)單的采取內(nèi)存作為標(biāo)記存儲(chǔ)位置。而進(jìn)程之間甚至可能都不在同一臺(tái)物理機(jī)上,因此需要將標(biāo)記存儲(chǔ)在一個(gè)所有進(jìn)程都能看到的地方。
什么是分布式鎖?
- 當(dāng)在分布式模型下,數(shù)據(jù)只有一份(或有限制),此時(shí)需要利用鎖的技術(shù)控制某一時(shí)刻修改數(shù)據(jù)的進(jìn)程數(shù)。
- 與單機(jī)模式下的鎖不僅需要保證進(jìn)程可見(jiàn),還需要考慮進(jìn)程與鎖之間的網(wǎng)絡(luò)問(wèn)題。(我覺(jué)得分布式情況下之所以問(wèn)題變得復(fù)雜,主要就是需要考慮到網(wǎng)絡(luò)的延時(shí)和不可靠。。。一個(gè)大坑)
- 分布式鎖還是可以將標(biāo)記存在內(nèi)存,只是該內(nèi)存不是某個(gè)進(jìn)程分配的內(nèi)存而是公共內(nèi)存如 Redis、Memcache。至于利用數(shù)據(jù)庫(kù)、文件等做鎖與單機(jī)的實(shí)現(xiàn)是一樣的,只要保證標(biāo)記能互斥就行。
2 我們需要怎樣的分布式鎖?
可以保證在分布式部署的應(yīng)用集群中,同一個(gè)方法在同一時(shí)間只能被一臺(tái)機(jī)器上的一個(gè)線程執(zhí)行。
- 這把鎖要是一把可重入鎖(避免死鎖)
- 這把鎖最好是一把阻塞鎖(根據(jù)業(yè)務(wù)需求考慮要不要這條)
- 這把鎖最好是一把公平鎖(根據(jù)業(yè)務(wù)需求考慮要不要這條)
有高可用的獲取鎖和釋放鎖功能
獲取鎖和釋放鎖的性能要好
代碼實(shí)現(xiàn)
- public interface IDistributedLock
- {
- ILockResult Lock(string resourceKey);
- ILockResult Lock(string resourceKey, TimeSpan expiryTime);
- ILockResult Lock(string resourceKey, TimeSpan expiryTime, TimeSpan waitTime, TimeSpan retryTime);
- ILockResult Lock(string resourceKey, TimeSpan expiryTime, TimeSpan waitTime, TimeSpan retryTime, CancellationToken cancellationToken);
- Task<ILockResult> LockAsync(string resourceKey);
- Task<ILockResult> LockAsync(string resourceKey, TimeSpan expiryTime);
- Task<ILockResult> LockAsync(string resourceKey, TimeSpan expiryTime, TimeSpan waitTime, TimeSpan retryTime);
- Task<ILockResult> LockAsync(string resourceKey, TimeSpan expiryTime, TimeSpan waitTime, TimeSpan retryTime, CancellationToken cancellationToken);
- }
- public interface ILockResult : IDisposable
- {
- string LockId { get; }
- bool IsAcquired { get; }
- int ExtendCount { get; }
- }
- class EndPoint:RedLock.RedisLockEndPoint
- {
- private readonly string _connectionString;
- public EndPoint(string connectionString)
- {
- _connectionString = connectionString;
- //139.196.40.252,password=xstudio,defaultDatabase=9
- var connection = connectionString.Split(',');
- var dict = new Dictionary<string, string>();
- foreach (var item in connection)
- {
- var keypar = item.Split('=');
- if (keypar.Length>1)
- {
- dict[keypar[0]] = keypar[1];
- }
- }
- this.EndPoint = new System.Net.DnsEndPoint(connection[0], 6379);
- if (dict.TryGetValue("password", out string password))
- {
- this.Password = password;
- }
- if (dict.TryGetValue("defaultDatabase", out string defaultDatabase) && int.TryParse(defaultDatabase,out int database))
- {
- RedisDatabase = database;
- }
- }
- }
- [Export(typeof(IDistributedLock))]
- class InnerLock : IDistributedLock
- {
- private static Lazy<RedLock.RedisLockFactory> _factory;
- static InnerLock()
- {
- _factory = new Lazy<RedisLockFactory>(() => new RedisLockFactory(new EndPoint(ConfigurationManager.AppSettings["Redis"])), System.Threading.LazyThreadSafetyMode.ExecutionAndPublication);
- }
- public ILockResult Lock(string resourceKey)
- {
- return new LockResult(_factory.Value.Create(resourceKey, TimeSpan.FromDays(1)));
- }
- public ILockResult Lock(string resourceKey, TimeSpan expiryTime)
- {
- return new LockResult(_factory.Value.Create(resourceKey, expiryTime));
- }
- public ILockResult Lock(string resourceKey, TimeSpan expiryTime, TimeSpan waitTime, TimeSpan retryTime)
- {
- return new LockResult(_factory.Value.Create(resourceKey, expiryTime, waitTime, retryTime));
- }
- public ILockResult Lock(string resourceKey, TimeSpan expiryTime, TimeSpan waitTime, TimeSpan retryTime, CancellationToken cancellationToken)
- {
- return new LockResult(_factory.Value.Create(resourceKey, expiryTime, waitTime, retryTime, cancellationToken));
- }
- public async Task<ILockResult> LockAsync(string resourceKey)
- {
- var result = await _factory.Value.CreateAsync(resourceKey, TimeSpan.FromDays(1));
- return new LockResult(result);
- }
- public async Task<ILockResult> LockAsync(string resourceKey, TimeSpan expiryTime)
- {
- var result = await _factory.Value.CreateAsync(resourceKey, expiryTime);
- return new LockResult(result);
- }
- public async Task<ILockResult> LockAsync(string resourceKey, TimeSpan expiryTime, TimeSpan waitTime, TimeSpan retryTime)
- {
- var result = await _factory.Value.CreateAsync(resourceKey, expiryTime, waitTime, retryTime);
- return new LockResult(result);
- }
- public async Task<ILockResult> LockAsync(string resourceKey, TimeSpan expiryTime, TimeSpan waitTime, TimeSpan retryTime, CancellationToken cancellationToken)
- {
- var result = await _factory.Value.CreateAsync(resourceKey, expiryTime, waitTime, retryTime, cancellationToken);
- return new LockResult(result);
- }
- }
- class LockResult : ILockResult
- {
- private IRedisLock _lock;
- public LockResult(IRedisLock redisLock)
- {
- _lock = redisLock;
- }
- public string LockId => _lock.LockId;
- public bool IsAcquired => _lock.IsAcquired;
- public int ExtendCount => _lock.ExtendCount;
- public void Dispose()
- {
- _lock.Dispose();
- }
- }