基礎(chǔ)篇:Java原子組件和同步組件
本文轉(zhuǎn)載自微信公眾號「潛行前行」,作者cscw 。轉(zhuǎn)載本文請聯(lián)系潛行前行公眾號。
前言
在使用多線程并發(fā)編程的時,經(jīng)常會遇到對共享變量修改操作。此時我們可以選擇ConcurrentHashMap,ConcurrentLinkedQueue來進行安全地存儲數(shù)據(jù)。但如果單單是涉及狀態(tài)的修改,線程執(zhí)行順序問題,使用Atomic開頭的原子組件或者ReentrantLock、CyclicBarrier之類的同步組件,會是更好的選擇,下面將一一介紹它們的原理和用法
- 原子組件的實現(xiàn)原理CAS
- AtomicBoolean、AtomicIntegerArray等原子組件的用法、
- 同步組件的實現(xiàn)原理
- ReentrantLock、CyclicBarrier等同步組件的用法
原子組件的實現(xiàn)原理CAS
- cas的底層實現(xiàn)可以看下之前寫的一篇文章:詳解鎖原理,synchronized、volatile+cas底層實現(xiàn)[1]
應(yīng)用場景
- 可用來實現(xiàn)變量、狀態(tài)在多線程下的原子性操作
- 可用于實現(xiàn)同步鎖(ReentrantLock)
原子組件
- 原子組件的原子性操作是靠使用cas來自旋操作volatile變量實現(xiàn)的
- volatile的類型變量保證變量被修改時,其他線程都能看到最新的值
- cas則保證value的修改操作是原子性的,不會被中斷
基本類型原子類
- AtomicBoolean //布爾類型
- AtomicInteger //正整型數(shù)類型
- AtomicLong //長整型類型
使用示例
- public static void main(String[] args) throws Exception {
- AtomicBoolean atomicBoolean = new AtomicBoolean(false);
- //異步線程修改atomicBoolean
- CompletableFuture<Void> future = CompletableFuture.runAsync(() ->{
- try {
- Thread.sleep(1000); //保證異步線程是在主線程之后修改atomicBoolean為false
- atomicBoolean.set(false);
- }catch (Exception e){
- throw new RuntimeException(e);
- }
- });
- atomicBoolean.set(true);
- future.join();
- System.out.println("boolean value is:"+atomicBoolean.get());
- }
- ---------------輸出結(jié)果------------------
- boolean value is:false
引用類原子類
- AtomicReference
- //加時間戳版本的引用類原子類
- AtomicStampedReference
- //相當(dāng)于AtomicStampedReference,AtomicMarkableReference關(guān)心的是
- //變量是否還是原來變量,中間被修改過也無所謂
- AtomicMarkableReference
- AtomicReference的源碼如下,它內(nèi)部定義了一個volatile V value,并借助VarHandle(具體子類是FieldInstanceReadWrite)實現(xiàn)原子操作,MethodHandles會幫忙計算value在類的偏移位置,最后在VarHandle調(diào)用Unsafe.public final native boolean compareAndSetReference(Object o, long offset, Object expected, Object x)方法原子修改對象的屬性
- public class AtomicReference<V> implements java.io.Serializable {
- private static final long serialVersionUID = -1848883965231344442L;
- private static final VarHandle VALUE;
- static {
- try {
- MethodHandles.Lookup l = MethodHandles.lookup();
- VALUE = l.findVarHandle(AtomicReference.class, "value", Object.class);
- } catch (ReflectiveOperationException e) {
- throw new ExceptionInInitializerError(e);
- }
- }
- private volatile V value;
- ....
ABA問題
- 線程X準(zhǔn)備將變量的值從A改為B,然而這期間線程Y將變量的值從A改為C,然后再改為A;最后線程X檢測變量值是A,并置換為B。但實際上,A已經(jīng)不再是原來的A了
- 解決方法,是把變量定為唯一類型。值可以加上版本號,或者時間戳。如加上版本號,線程Y的修改變?yōu)锳1->B2->A3,此時線程X再更新則可以判斷出A1不等于A3
- AtomicStampedReference的實現(xiàn)和AtomicReference差不多,不過它原子修改的變量是volatile Pair
pair;,Pair是其內(nèi)部類。AtomicStampedReference可以用來解決ABA問題
- public class AtomicStampedReference<V> {
- private static class Pair<T> {
- final T reference;
- final int stamp;
- private Pair(T reference, int stamp) {
- this.reference = reference;
- this.stamp = stamp;
- }
- static <T> Pair<T> of(T reference, int stamp) {
- return new Pair<T>(reference, stamp);
- }
- }
- private volatile Pair<V> pair;
- 如果我們不關(guān)心變量在中間過程是否被修改過,而只是關(guān)心當(dāng)前變量是否還是原先的變量,則可以使用AtomicMarkableReference
- AtomicStampedReference的使用示例
- public class Main {
- public static void main(String[] args) throws Exception {
- Test old = new Test("hello"), newTest = new Test("world");
- AtomicStampedReference<Test> reference = new AtomicStampedReference<>(old, 1);
- reference.compareAndSet(old, newTest,1,2);
- System.out.println("對象:"+reference.getReference().name+";版本號:"+reference.getStamp());
- }
- }
- class Test{
- Test(String name){ this.name = name; }
- public String name;
- }
- ---------------輸出結(jié)果------------------
- 對象:world;版本號:2
數(shù)組原子類
- AtomicIntegerArray //整型數(shù)組
- AtomicLongArray //長整型數(shù)組
- AtomicReferenceArray //引用類型數(shù)組
- 數(shù)組原子類內(nèi)部會初始一個final的數(shù)組,它把整個數(shù)組當(dāng)做一個對象,然后根據(jù)下標(biāo)index計算法元素偏移量,再調(diào)用UNSAFE.compareAndSetReference進行原子操作。數(shù)組并沒被volatile修飾,為了保證元素類型在不同線程的可見,獲取元素使用到了UNSAFEpublic native Object getReferenceVolatile(Object o, long offset)方法來獲取實時的元素值
- 使用示例
- //元素默認初始化為0
- AtomicIntegerArray array = new AtomicIntegerArray(2);
- // 下標(biāo)為0的元素,期待值是0,更新值是1
- array.compareAndSet(0,0,1);
- System.out.println(array.get(0));
- ---------------輸出結(jié)果------------------
- 1
屬性原子類
- AtomicIntegerFieldUpdater
- AtomicLongFieldUpdater
- AtomicReferenceFieldUpdater
- 如果操作對象是某一類型的屬性,可以使用AtomicIntegerFieldUpdater原子更新,不過類的屬性需要定義成volatile修飾的變量,保證該屬性在各個線程的可見性,否則會報錯
- 使用示例
- public class Main {
- public static void main(String[] args) {
- AtomicReferenceFieldUpdater<Test,String> fieldUpdater = AtomicReferenceFieldUpdater.newUpdater(Test.class,String.class,"name");
- Test test = new Test("hello world");
- fieldUpdater.compareAndSet(test,"hello world","siting");
- System.out.println(fieldUpdater.get(test));
- System.out.println(test.name);
- }
- }
- class Test{
- Test(String name){ this.name = name; }
- public volatile String name;
- }
- ---------------輸出結(jié)果------------------
- siting
- siting
累加器
- Striped64
- LongAccumulator
- LongAdder
- //accumulatorFunction:運算規(guī)則,identity:初始值
- public LongAccumulator(LongBinaryOperator accumulatorFunction,long identity)
- LongAccumulator和LongAdder都繼承于Striped64,Striped64的主要思想是和ConcurrentHashMap有點類似,分段計算,單個變量計算并發(fā)性能慢時,我們可以把數(shù)學(xué)運算分散在多個變量,而需要計算總值時,再一一累加起來
- LongAdder相當(dāng)于LongAccumulator一個特例實現(xiàn)
- LongAccumulator的示例
- public static void main(String[] args) throws Exception {
- LongAccumulator accumulator = new LongAccumulator(Long::sum, 0);
- for(int i=0;i<100000;i++){
- CompletableFuture.runAsync(() -> accumulator.accumulate(1));
- }
- Thread.sleep(1000); //等待全部CompletableFuture線程執(zhí)行完成,再獲取
- System.out.println(accumulator.get());
- }
- ---------------輸出結(jié)果------------------
- 100000
同步組件的實現(xiàn)原理
java的多數(shù)同步組件會在內(nèi)部維護一個狀態(tài)值,和原子組件一樣,修改狀態(tài)值時一般也是通過cas來實現(xiàn)。而狀態(tài)修改的維護工作被Doug Lea抽象出AbstractQueuedSynchronizer(AQS)來實現(xiàn)
AQS的原理可以看下之前寫的一篇文章:詳解鎖原理,synchronized、volatile+cas底層實現(xiàn)[2]
同步組件
ReentrantLock、ReentrantReadWriteLock
- ReentrantLock、ReentrantReadWriteLock都是基于AQS(AbstractQueuedSynchronizer)實現(xiàn)的。因為它們有公平鎖和非公平鎖的區(qū)分,因此沒直接繼承AQS,而是使用內(nèi)部類去繼承,公平鎖和非公平鎖各自實現(xiàn)AQS,ReentrantLock、ReentrantReadWriteLock再借助內(nèi)部類來實現(xiàn)同步
- ReentrantLock的使用示例
- ReentrantLock lock = new ReentrantLock();
- if(lock.tryLock()){
- //業(yè)務(wù)邏輯
- lock.unlock();
- }
- ReentrantReadWriteLock的使用示例
- public static void main(String[] args) throws Exception {
- ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- if(lock.readLock().tryLock()){ //讀鎖
- //業(yè)務(wù)邏輯
- lock.readLock().unlock();
- }
- if(lock.writeLock().tryLock()){ //寫鎖
- //業(yè)務(wù)邏輯
- lock.writeLock().unlock();
- }
- }
Semaphore實現(xiàn)原理和使用場景
- Semaphore和ReentrantLock一樣,也有公平和非公平競爭鎖的策略,一樣也是通過內(nèi)部類繼承AQS來實現(xiàn)同步
- 通俗解釋:假設(shè)有一口井,最多有三個人的位置打水。每有一個人打水,則需要占用一個位置。當(dāng)三個位置全部占滿時,第四個人需要打水,則要等待前三個人中一個離開打水位,才能繼續(xù)獲取打水的位置
- 使用示例
- public static void main(String[] args) throws Exception {
- Semaphore semaphore = new Semaphore(2);
- for (int i = 0; i < 3; i++)
- CompletableFuture.runAsync(() -> {
- try {
- System.out.println(Thread.currentThread().toString() + " start ");
- if(semaphore.tryAcquire(1)){
- Thread.sleep(1000);
- semaphore.release(1);
- System.out.println(Thread.currentThread().toString() + " 無阻塞結(jié)束 ");
- }else {
- System.out.println(Thread.currentThread().toString() + " 被阻塞結(jié)束 ");
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
- //保證CompletableFuture 線程被執(zhí)行,主線程再結(jié)束
- Thread.sleep(2000);
- }
- ---------------輸出結(jié)果------------------
- Thread[ForkJoinPool.commonPool-worker-19,5,main] start
- Thread[ForkJoinPool.commonPool-worker-5,5,main] start
- Thread[ForkJoinPool.commonPool-worker-23,5,main] start
- Thread[ForkJoinPool.commonPool-worker-23,5,main] 被阻塞結(jié)束
- Thread[ForkJoinPool.commonPool-worker-5,5,main] 無阻塞結(jié)束
- Thread[ForkJoinPool.commonPool-worker-19,5,main] 無阻塞結(jié)束
可以看出三個線程,因為信號量設(shè)定為2,第三個線程是無法獲取信息成功的,會打印阻塞結(jié)束
CountDownLatch實現(xiàn)原理和使用場景
- CountDownLatch也是靠AQS實現(xiàn)的同步操作
- 通俗解釋:玩游戲時,假如主線任務(wù)需要靠完成五個小任務(wù),主線任務(wù)才能繼續(xù)進行時。此時可以用CountDownLatch,主線任務(wù)阻塞等待,每完成一小任務(wù),就done一次計數(shù),直到五個小任務(wù)全部被執(zhí)行才能觸發(fā)主線
- 使用示例
- public static void main(String[] args) throws Exception {
- CountDownLatch count = new CountDownLatch(2);
- for (int i = 0; i < 2; i++)
- CompletableFuture.runAsync(() -> {
- try {
- Thread.sleep(1000);
- System.out.println(" CompletableFuture over ");
- count.countDown();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
- //等待CompletableFuture線程的完成
- count.await();
- System.out.println(" main over ");
- }
- ---------------輸出結(jié)果------------------
- CompletableFuture over
- CompletableFuture over
- main over
CyclicBarrier實現(xiàn)原理和使用場景
- CyclicBarrier則是靠ReentrantLock lock和Condition trip屬性來實現(xiàn)同步
- 通俗解釋:CyclicBarrier需要阻塞全部線程到await狀態(tài),然后全部線程再全部被喚醒執(zhí)行。想象有一個欄桿攔住五只羊,需要當(dāng)五只羊一起站在欄桿時,欄桿才會被拉起,此時所有的羊都可以飛跑出羊圈
- 使用示例
- public static void main(String[] args) throws Exception {
- CyclicBarrier barrier = new CyclicBarrier(2);
- CompletableFuture.runAsync(()->{
- try {
- System.out.println("CompletableFuture run start-"+ Clock.systemUTC().millis());
- barrier.await(); //需要等待main線程也執(zhí)行到await狀態(tài)才能繼續(xù)執(zhí)行
- System.out.println("CompletableFuture run over-"+ Clock.systemUTC().millis());
- }catch (Exception e){
- throw new RuntimeException(e);
- }
- });
- Thread.sleep(1000);
- //和CompletableFuture線程相互等待
- barrier.await();
- System.out.println("main run over!");
- }
- ---------------輸出結(jié)果------------------
- CompletableFuture run start-1609822588881
- main run over!
- CompletableFuture run over-1609822589880
StampedLock
- StampedLock不是借助AQS,而是自己內(nèi)部維護多個狀態(tài)值,并配合cas實現(xiàn)的
- StampedLock具有三種模式:寫模式、讀模式、樂觀讀模式
- StampedLock的讀寫鎖可以相互轉(zhuǎn)換
- //獲取讀鎖,自旋獲取,返回一個戳值
- public long readLock()
- //嘗試加讀鎖,不成功返回0
- public long tryReadLock()
- //解鎖
- public void unlockRead(long stamp)
- //獲取寫鎖,自旋獲取,返回一個戳值
- public long writeLock()
- //嘗試加寫鎖,不成功返回0
- public long tryWriteLock()
- //解鎖
- public void unlockWrite(long stamp)
- //嘗試樂觀讀讀取一個時間戳,并配合validate方法校驗時間戳的有效性
- public long tryOptimisticRead()
- //驗證stamp是否有效
- public boolean validate(long stamp)
- 使用示例
- public static void main(String[] args) throws Exception {
- StampedLock stampedLock = new StampedLock();
- long stamp = stampedLock.tryOptimisticRead();
- //判斷版本號是否生效
- if (!stampedLock.validate(stamp)) {
- //獲取讀鎖,會空轉(zhuǎn)
- stamp = stampedLock.readLock();
- long writeStamp = stampedLock.tryConvertToWriteLock(stamp);
- if (writeStamp != 0) { //成功轉(zhuǎn)為寫鎖
- //fixme 業(yè)務(wù)操作
- stampedLock.unlockWrite(writeStamp);
- } else {
- stampedLock.unlockRead(stamp);
- //嘗試獲取寫讀
- stamp = stampedLock.tryWriteLock();
- if (stamp != 0) {
- //fixme 業(yè)務(wù)操作
- stampedLock.unlockWrite(writeStamp);
- }
- }
- }
- }
參考文章
- 并發(fā)之Striped64(l累加器)[3]
參考資料
[1]詳解鎖原理,synchronized、volatile+cas底層實現(xiàn): https://juejin.cn/post/6854573210768900110
[2]詳解鎖原理,synchronized、volatile+cas底層實現(xiàn): https://juejin.cn/post/6854573210768900110
[3]并發(fā)之Striped64(l累加器): https://www.cnblogs.com/gosaint/p/9129867.html