并發(fā)編程利器 Java CAS 原子類全解
在現(xiàn)代軟件開發(fā)中,多線程編程已經(jīng)成為構(gòu)建高性能、高響應(yīng)性應(yīng)用程序的關(guān)鍵技術(shù)之一。然而,多線程環(huán)境下的數(shù)據(jù)一致性問(wèn)題一直是開發(fā)者面臨的一大挑戰(zhàn)。為了解決這一難題,Java 平臺(tái)提供了多種并發(fā)控制工具和機(jī)制,其中 java.util.concurrent.atomic 包中的 CAS(Compare-and-Swap)原子類尤為突出。
本文將全面解析 Java 中的 CAS 原子類,探討其背后的原理、應(yīng)用場(chǎng)景以及如何有效利用這些工具來(lái)提升程序的并發(fā)性能和安全性。通過(guò)深入理解 CAS 機(jī)制,讀者將能夠更好地應(yīng)對(duì)復(fù)雜的并發(fā)編程場(chǎng)景,編寫出更加健壯和高效的代碼。
一、什么是CAS
CAS全稱Compare-And-Swap,是一種無(wú)鎖編程算法,即比較當(dāng)前的值與舊值是否相等若相等則進(jìn)行修改操作(樂(lè)觀鎖機(jī)制),該類常用于多線程共享變量的修改操作。而其底層實(shí)現(xiàn)也是基于硬件平臺(tái)的匯編指令,JVM只是封裝其調(diào)用僅此而已。而本文會(huì)基于以下大綱展開對(duì)CAS的探討。
二、CAS基礎(chǔ)使用示例
如下所示,可以看出使用封裝CAS操作的AtomicInteger操作多線程共享變量無(wú)需我們手動(dòng)加鎖,因?yàn)楸苊膺^(guò)多人為操作這就大大減少了多線程操作下的失誤。
使用原子類操作共享數(shù)據(jù):
public class CasTest {
private AtomicInteger count = new AtomicInteger();
public void increment() {
count.incrementAndGet();
}
// 使用 AtomicInteger 后,不需要加鎖,也可以實(shí)現(xiàn)線程安全
public int getCount() {
return count.get();
}
public static void main(String[] args) {
}
}
使用sync鎖操作數(shù)據(jù):
public class Test {
private int i=0;
public synchronized int add(){
return i++;
}
}
三、從源碼角度了解java如何封裝匯編的UNSAFE
代碼也很簡(jiǎn)單,就是拿到具有可見性的volatile變量i,然后判斷i和當(dāng)前對(duì)象paramObject對(duì)應(yīng)的i值是否一致,若一致則說(shuō)明沒(méi)被人該過(guò),進(jìn)而進(jìn)行修改操作,反之自旋循環(huán)獲取在進(jìn)行CAS。
public final int getAndAddInt(Object paramObject, long paramLong, int paramInt)
{
int i;
do
i = getIntVolatile(paramObject, paramLong);
while (!compareAndSwapInt(paramObject, paramLong, i, i + paramInt));
return i;
}
public final long getAndAddLong(Object paramObject, long paramLong1, long paramLong2)
{
long l;
do
l = getLongVolatile(paramObject, paramLong1);
while (!compareAndSwapLong(paramObject, paramLong1, l, l + paramLong2));
return l;
}
public final int getAndSetInt(Object paramObject, long paramLong, int paramInt)
{
int i;
do
i = getIntVolatile(paramObject, paramLong);
while (!compareAndSwapInt(paramObject, paramLong, i, paramInt));
return i;
}
public final long getAndSetLong(Object paramObject, long paramLong1, long paramLong2)
{
long l;
do
l = getLongVolatile(paramObject, paramLong1);
while (!compareAndSwapLong(paramObject, paramLong1, l, paramLong2));
return l;
}
public final Object getAndSetObject(Object paramObject1, long paramLong, Object paramObject2)
{
Object localObject;
do
localObject = getObjectVolatile(paramObject1, paramLong);
while (!compareAndSwapObject(paramObject1, paramLong, localObject, paramObject2));
return localObject;
}
四、手寫Unsafe實(shí)現(xiàn)20個(gè)線程500次CAS自增
代碼邏輯和注釋如下,讀者可自行debug查看邏輯:
public class CasCountInc {
private static Logger logger = LoggerFactory.getLogger(CasCountInc.class);
// 獲取Unsafe對(duì)象
private static Unsafe unsafe = getUnsafe();
// 線程池?cái)?shù)目
private static final int THREAD_COUNT = 20;
// 每個(gè)線程運(yùn)行自增次數(shù)
private static final int EVERY_THREAD_ADD_COUNT = 500;
// 自增的count的值,volatile保證可見性
private volatile int count = 0;
// count字段的偏移量
private static long countOffSet;
private static Unsafe getUnsafe() {
Unsafe unsafe = null;
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
unsafe = (Unsafe) field.get(null);
} catch (Exception e) {
logger.info("獲取unsafe失敗,失敗原因:[{}]", e.getMessage(), e);
}
return unsafe;
}
static {
try {
countOffSet = unsafe.objectFieldOffset(CasCountInc.class.getDeclaredField("count"));
} catch (NoSuchFieldException e) {
logger.error("獲取count的偏移量報(bào)錯(cuò),錯(cuò)誤原因:[{}]", e.getMessage(), e);
}
}
public void inc() {
int oldCount = 0;
//基于cas完成自增
do {
oldCount = count;
} while (!unsafe.compareAndSwapInt(this, countOffSet, oldCount, oldCount + 1));
}
public static void main(String[] args) throws InterruptedException {
CasCountInc casCountInc = new CasCountInc();
CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);
IntStream.range(0, THREAD_COUNT).forEach(i -> {
new Thread(() -> {
IntStream.range(0, EVERY_THREAD_ADD_COUNT).forEach((j) -> {
casCountInc.inc();
});
countDownLatch.countDown();
}).start();
});
countDownLatch.await();
logger.info("count最終結(jié)果為 [{}]", casCountInc.count);
}
}
五、原子類簡(jiǎn)介
1.原子類更新基本類型
原子類基本類型的格式為Atomic+包裝類名,這里筆者列舉幾個(gè)比較常用的:
- AtomicBoolean: 原子更新布爾類型。
- AtomicInteger: 原子更新整型。
- AtomicLong: 原子更新長(zhǎng)整型。
2.原子類更新數(shù)組類型
- AtomicIntegerArray: 原子更新整型數(shù)組里的元素。
- AtomicLongArray: 原子更新長(zhǎng)整型數(shù)組里的元素。
- AtomicReferenceArray: 原子更新引用類型數(shù)組里的元素。
對(duì)應(yīng)我們給出AtomicIntegerArray原子操作數(shù)組的示例:
public class AtomicIntegerArrayDemo {
public static void main(String[] args) throws InterruptedException {
AtomicIntegerArray array = new AtomicIntegerArray(new int[] { 0, 0 });
System.out.println(array);
// 索引1位置+2
System.out.println(array.getAndAdd(1, 2));
System.out.println(array);
}
}
3.原子類更新引用類型
- AtomicReference: 原子更新引用類型。
- AtomicStampedReference: 原子更新引用類型, 內(nèi)部使用Pair來(lái)存儲(chǔ)元素值及其版本號(hào)。
- AtomicMarkableReferce: 原子更新帶有標(biāo)記位的引用類型。
對(duì)應(yīng)的我們給出原子操作引用類型的代碼示例:
import java.util.concurrent.atomic.AtomicReference;
public class AtomicReferenceTest {
public static void main(String[] args){
// 創(chuàng)建兩個(gè)Person對(duì)象,它們的id分別是101和102。
Person p1 = new Person(101);
Person p2 = new Person(102);
// 新建AtomicReference對(duì)象,初始化它的值為p1對(duì)象
AtomicReference ar = new AtomicReference(p1);
// 通過(guò)CAS設(shè)置ar。如果ar的值為p1的話,則將其設(shè)置為p2。
ar.compareAndSet(p1, p2);
Person p3 = (Person)ar.get();
System.out.println("p3 is "+p3);
System.out.println("p3.equals(p1)="+p3.equals(p1));
System.out.println("p3.equals(p2)="+p3.equals(p2));
}
}
class Person {
volatile long id;
public Person(long id) {
this.id = id;
}
public String toString() {
return "id:"+id;
}
}
六、原子類更新成員變量
通過(guò)原子類型操作成員變量大體有以下幾個(gè)更新器:
- AtomicIntegerFieldUpdater: 原子更新整型的字段的更新器。
- AtomicLongFieldUpdater: 原子更新長(zhǎng)整型字段的更新器。
- AtomicStampedFieldUpdater: 原子更新帶有版本號(hào)的引用類型
- AtomicReferenceFieldUpdater: 上面已經(jīng)說(shuō)過(guò)此處不在贅述。
如下所示,我們創(chuàng)建一個(gè)基礎(chǔ)類DataDemo,通過(guò)原子類CAS操作字段值進(jìn)行自增操作。
public class TestAtomicIntegerFieldUpdater {
private static Logger logger = LoggerFactory.getLogger(TestAtomicIntegerFieldUpdater.class);
public static void main(String[] args) {
TestAtomicIntegerFieldUpdater tIA = new TestAtomicIntegerFieldUpdater();
tIA.doIt();
}
/**
* 返回需要更新的整型字段更新器
*
* @param fieldName
* @return
*/
public AtomicIntegerFieldUpdater<DataDemo> updater(String fieldName) {
return AtomicIntegerFieldUpdater.newUpdater(DataDemo.class, fieldName);
}
public void doIt() {
DataDemo data = new DataDemo();
// 修改公共變量,返回更新前的舊值 0
AtomicIntegerFieldUpdater<DataDemo> updater = updater("publicVar");
int oldVal = updater.getAndIncrement(data);
logger.info("publicVar 更新前的值[{}] 更新后的值 [{}]", oldVal, data.publicVar);
// 更新保護(hù)級(jí)別的變量
AtomicIntegerFieldUpdater<DataDemo> protectedVarUpdater = updater("protectedVar");
int oldProtectedVar = protectedVarUpdater.getAndAdd(data, 2);
logger.info("protectedVar 更新前的值[{}] 更新后的值 [{}]", oldProtectedVar, data.protectedVar);
// logger.info("privateVar = "+updater("privateVar").getAndAdd(data,2)); 私有變量會(huì)報(bào)錯(cuò)
/*
* 下面報(bào)異常:must be integer
* */
// logger.info("integerVar = "+updater("integerVar").getAndIncrement(data));
//logger.info("longVar = "+updater("longVar").getAndIncrement(data));
}
class DataDemo {
// 公共且可見的publicVar
public volatile int publicVar = 0;
// 保護(hù)級(jí)別的protectedVar
protected volatile int protectedVar = 4;
// 私有變量
private volatile int privateVar = 5;
// final 不可變量
public final int finalVar = 11;
public volatile Integer integerVar = 19;
public volatile Long longVar = 18L;
}
}
public class TestAtomicIntegerFieldUpdater {
private static Logger logger = LoggerFactory.getLogger(TestAtomicIntegerFieldUpdater.class);
public static void main(String[] args) {
TestAtomicIntegerFieldUpdater tIA = new TestAtomicIntegerFieldUpdater();
tIA.doIt();
}
/**
* 返回需要更新的整型字段更新器
*
* @param fieldName
* @return
*/
public AtomicIntegerFieldUpdater<DataDemo> updater(String fieldName) {
return AtomicIntegerFieldUpdater.newUpdater(DataDemo.class, fieldName);
}
public void doIt() {
DataDemo data = new DataDemo();
// 修改公共變量,返回更新前的舊值 0
AtomicIntegerFieldUpdater<DataDemo> updater = updater("publicVar");
int oldVal = updater.getAndIncrement(data);
logger.info("publicVar 更新前的值[{}] 更新后的值 [{}]", oldVal, data.publicVar);
// 更新保護(hù)級(jí)別的變量
AtomicIntegerFieldUpdater<DataDemo> protectedVarUpdater = updater("protectedVar");
int oldProtectedVar = protectedVarUpdater.getAndAdd(data, 2);
logger.info("protectedVar 更新前的值[{}] 更新后的值 [{}]", oldProtectedVar, data.protectedVar);
// logger.info("privateVar = "+updater("privateVar").getAndAdd(data,2)); 私有變量會(huì)報(bào)錯(cuò)
/*
* 下面報(bào)異常:must be integer
* */
// logger.info("integerVar = "+updater("integerVar").getAndIncrement(data));
//logger.info("longVar = "+updater("longVar").getAndIncrement(data));
}
class DataDemo {
// 公共且可見的publicVar
public volatile int publicVar = 0;
// 保護(hù)級(jí)別的protectedVar
protected volatile int protectedVar = 4;
// 私有變量
private volatile int privateVar = 5;
// final 不可變量
public final int finalVar = 11;
public volatile Integer integerVar = 19;
public volatile Long longVar = 18L;
}
}
通過(guò)上述代碼我們可以總結(jié)出CAS字段必須符合以下要求:
- 變量必須使用volatile保證可見性
- 必須是當(dāng)前對(duì)象可以訪問(wèn)到的類型才可進(jìn)行操作‘
- 只能是實(shí)例變量而不是類變量,即不可以有static修飾符
- 包裝類也不行
七、CAS的ABA問(wèn)題
CAS更新前會(huì)檢查值有沒(méi)有變化,如果沒(méi)有變化則認(rèn)為沒(méi)人修改過(guò),在進(jìn)行更新操作。這種情況下,若我們A值修改為B,B再還原為A。這種修改再還原的操作,CAS是無(wú)法感知是否變化的,這就是所謂的ABA問(wèn)題。
1.AtomicStampedReference源碼詳解
源碼如下所示,可以看到AtomicStampedReference解決ABA問(wèn)題的方式是基于當(dāng)前修改操作的時(shí)間戳和元引用值是否一致,若一直則進(jìn)行數(shù)據(jù)更新
public class AtomicStampedReference<V> {
private static class Pair<T> {
final T reference; //維護(hù)對(duì)象引用
final int stamp; //用于標(biāo)志版本
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
private volatile Pair<V> pair;
....
/**
* expectedReference :更新之前的原始引用值
* newReference : 新值
* expectedStamp : 預(yù)期時(shí)間戳
* newStamp : 更新后的時(shí)間戳
*/
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
// 獲取當(dāng)前的(元素值,版本號(hào))對(duì)
Pair<V> current = pair;
return
// 引用沒(méi)變
expectedReference == current.reference &&
// 版本號(hào)沒(méi)變
expectedStamp == current.stamp &&
//可以看到這個(gè)括號(hào)里面用了一個(gè)短路運(yùn)算如果當(dāng)前版本與新值一樣就說(shuō)更新過(guò),就不往下走CAS代碼了
((newReference == current.reference &&
newStamp == current.stamp) ||
// 構(gòu)造新的Pair對(duì)象并CAS更新
casPair(current, Pair.of(newReference, newStamp)));
}
private boolean casPair(Pair<V> cmp, Pair<V> val) {
// 調(diào)用Unsafe的compareAndSwapObject()方法CAS更新pair的引用為新引用
return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}
2.AtomicStampedReference解決ABA問(wèn)題示例
代碼示例,我們下面就用other代碼模擬干擾現(xiàn)場(chǎng),如果other現(xiàn)場(chǎng)先進(jìn)行CAS更新再還原操作,那么main線程的版本號(hào)就會(huì)過(guò)時(shí),CAS就會(huì)操作失敗
/**
* ABA問(wèn)題代碼示例
*/
public class AtomicStampedReferenceTest {
private static AtomicStampedReference<Integer> atomicStampedRef =
new AtomicStampedReference<>(1, 0);
public static void main(String[] args) {
Thread main = new Thread(() -> {
System.out.println("操作線程" + Thread.currentThread() + ",初始值 a = " + atomicStampedRef.getReference());
int stamp = atomicStampedRef.getStamp(); //獲取當(dāng)前標(biāo)識(shí)別
try {
Thread.sleep(1000); //等待1秒 ,以便讓干擾線程執(zhí)行
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean isCASSuccess = atomicStampedRef.compareAndSet(1, 2, stamp, stamp + 1); //此時(shí)expectedReference未發(fā)生改變,但是stamp已經(jīng)被修改了,所以CAS失敗
System.out.println("操作線程" + Thread.currentThread() + ",CAS操作結(jié)果: " + isCASSuccess);
}, "主操作線程");
Thread other = new Thread(() -> {
Thread.yield(); // 確保thread-main 優(yōu)先執(zhí)行
atomicStampedRef.compareAndSet(1, 2, atomicStampedRef.getStamp(), atomicStampedRef.getStamp() + 1);
System.out.println("操作線程" + Thread.currentThread() + ",【increment】 ,值 = " + atomicStampedRef.getReference());
atomicStampedRef.compareAndSet(2, 1, atomicStampedRef.getStamp(), atomicStampedRef.getStamp() + 1);
System.out.println("操作線程" + Thread.currentThread() + ",【decrement】 ,值 = " + atomicStampedRef.getReference());
}, "干擾線程");
main.start();
other.start();
}
}
3.AtomicMarkableReference解決對(duì)象ABA問(wèn)題
AtomicMarkableReference,它不是維護(hù)一個(gè)版本號(hào),而是維護(hù)一個(gè)boolean類型的標(biāo)記,標(biāo)記對(duì)象是否有修改,從而解決ABA問(wèn)題。
public boolean weakCompareAndSet(V expectedReference,
V newReference,
boolean expectedMark,
boolean newMark) {
return compareAndSet(expectedReference, newReference,
expectedMark, newMark);
}
八、常見面試題
1.CAS為什么比synchronized快(重點(diǎn))
CAS工作原理是基于樂(lè)觀鎖且操作是原子性的,與synchronized的悲觀鎖(底層需要調(diào)用操作系統(tǒng)的mutex鎖)相比,效率也會(huì)相對(duì)高一些。
2.CAS是不是操作系統(tǒng)執(zhí)行的?(重點(diǎn))
不是,CAS是主要是通過(guò)處理器的指令來(lái)保證原子性的,在上面的講解中我們都知道CAS操作底層都是調(diào)用Unsafe的native修飾的方法,以AtomicInteger為例對(duì)應(yīng)的底層的實(shí)現(xiàn)是Unsafe的compareAndSwapInt:
public final native boolean compareAndSwapInt(Object o, long offset,
int expected,
int x);
對(duì)應(yīng)的我們給出這段代碼的c語(yǔ)言實(shí)現(xiàn),即位于:https://github.com/openjdk/jdk/blob/jdk8-b01/hotspot/src/share/vm/prims/unsafe.cpp的unsafe.cpp:
可以看到出去前兩個(gè)形參后續(xù)的參數(shù)與compareAndSwapInt列表一一對(duì)應(yīng),這段代碼執(zhí)行CAS操作時(shí),本質(zhì)上就是調(diào)用cmpxchg指令(Compare and Exchange),cmpxchg指令會(huì)判斷當(dāng)前服務(wù)器是否是多核,如果是則講LOCK前綴保證cmpxchg操作的原子性,反之就不加Lock前綴直接執(zhí)行比對(duì)后修改變量值這種樂(lè)觀鎖操作。
對(duì)應(yīng)源碼如下,它首先獲取字段的偏移地址,然后傳入預(yù)期值e與原值比較,如果一致,則將新結(jié)果x寫入原子操作變量?jī)?nèi)存中:
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
//獲取字段偏移量地址
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
//比較如果期望值e和當(dāng)前字段存儲(chǔ)的值一樣,則講值更新為x
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END
3.CAS存在那些問(wèn)題?
但即便如此CAS仍然存在兩個(gè)問(wèn)題:
(1) 可能存在長(zhǎng)時(shí)間CAS:如下代碼所示,這就是AtomicInteger底層的UNSAFE類如何進(jìn)行CAS的具體代碼 ,可以看出這個(gè)CAS操作需要拿到volatile變量后在進(jìn)行循環(huán)CAS才有可能成功這就很可能存在自旋循環(huán),從而給CPU帶來(lái)很大的執(zhí)行開銷。
public final int getAndAddInt(Object paramObject, long paramLong, int paramInt)
{
int i;
do
//獲取最新結(jié)果
i = getIntVolatile(paramObject, paramLong);
//通過(guò)cas自旋操作完成自增
while (!compareAndSwapInt(paramObject, paramLong, i, i + paramInt));
return i;
}
(2) CAS只能對(duì)一個(gè)變量進(jìn)行原子操作:為了解決這個(gè)問(wèn)題,JDK 1.5之后通過(guò)AtomicReference使得變量可以封裝成一個(gè)對(duì)象進(jìn)行操作
ABA問(wèn)題:總所周知CAS就是比對(duì)當(dāng)前值與舊值是否相等,在進(jìn)行修改操作,假設(shè)我現(xiàn)在有一個(gè)變量值為A,我改為B,再還原為A,這樣操作變量值是沒(méi)變的?那么CAS也會(huì)成功不就不合理嗎?這就好比一個(gè)銀行儲(chǔ)戶想查詢概念轉(zhuǎn)賬記錄,如果轉(zhuǎn)賬一次記為1,如果按照ABA問(wèn)題的邏輯,那么這個(gè)銀行賬戶轉(zhuǎn)賬記錄次數(shù)有可能會(huì)缺少。為了解決這個(gè)問(wèn)題JDK 1.5提供了AtomicStampedReference,通過(guò)比對(duì)版本號(hào)在進(jìn)行CAS操作,那么上述操作就會(huì)變?yōu)?A->2B->3A,由于版本追加,那么我們就能捕捉到當(dāng)前變量的變化了。
4.AtomicInteger自增到10000后如何歸零
AtomicInteger atomicInteger=new AtomicInteger(10000);
atomicInteger.compareAndSet(10000, 0);
5.CAS 平時(shí)怎么用的,會(huì)有什么問(wèn)題,為什么快,如果我用 for 循環(huán)代替 CAS 執(zhí)行效率是一樣的嗎?(重點(diǎn))
問(wèn)題1: 一些需要并發(fā)計(jì)數(shù)并實(shí)時(shí)監(jiān)控的場(chǎng)景可以用到。
問(wèn)題2: CAS存在問(wèn)題:CAS是基于樂(lè)觀鎖機(jī)制,所以數(shù)據(jù)同步失敗就會(huì)原地自旋,在高并發(fā)場(chǎng)景下開銷很大,所以線程數(shù)很大的情況下不建議使用原子類。
問(wèn)題3:用 for 循環(huán)代替 CAS 存在問(wèn)題: 如果并發(fā)量大的話,自旋的線程多了就會(huì)導(dǎo)致性能瓶頸。
for 循環(huán)代替 CAS執(zhí)行效率是否一樣:大概率是CAS快,原因如下:
- CAS是native方法更接近底層
- for循環(huán)為了保證線程安全可能會(huì)用到sync鎖或者Lock無(wú)論那種都需要上鎖和釋放的邏輯,相比CAS樂(lè)觀鎖來(lái)說(shuō)開銷很大。