精通Java并發鎖機制:24種鎖技巧+業務鎖匹配方案
在 Java 并發編程中,鎖是確保線程安全、協調多線程訪問共享資源的關鍵機制。從基本的 synchronized 同步關鍵字到高級的 ReentrantLock、讀寫鎖 ReadWriteLock、無鎖設計如 AtomicInteger,再到復雜的同步輔助工具如 CountDownLatch、 CyclicBarrier 和 Semaphore,每種鎖都針對特定的并發場景設計,以解決多線程環境下的同步問題。 StampedLock 提供了樂觀讀鎖和悲觀寫鎖的選項,而 ConcurrentHashMap 和 ConcurrentLinkedQueue 等并發集合則通過內部機制優化了并發訪問。了解不同鎖的特點和適用場景,對于構建高效、穩定的并發應用程序至關重要。
1、鎖選擇維度
選擇適合的鎖通常依賴于特定的應用場景和并發需求。以下是一個表格,概述了不同鎖類型的關鍵特性和選擇它們的考量維度:
鎖類型 | 適用場景 | 鎖模式 | 性能特點 | 公平性 | 鎖的粗細 | 條件支持 | 阻塞策略 | 用途舉例 |
| 簡單的同步需求,無需復雜控制 | 獨占式 | 適中,偏向鎖、輕量級鎖優化 | 無公平策略 | 粗粒度鎖 | 不支持 | 阻塞等待 | 單例模式、簡單的計數器 |
| 需要靈活的鎖控制,如可中斷、超時、嘗試鎖定等 | 獨占式 | 高,支持多種鎖定方式 | 可配置公平性 | 細粒度鎖 | 支持 | 可中斷、超時、嘗試 | 同步代碼塊或方法、復雜同步控制 |
| 讀多寫少的場景 | 共享-獨占式 | 高,提高讀操作并發性 | 不支持公平性 | 細粒度鎖 | 不支持 | 阻塞等待 | 緩存系統、文件系統 |
| 讀多寫多,需要樂觀讀和悲觀寫的場景 | 樂觀讀-悲觀寫 | 高,提供讀寫鎖的擴展 | 可配置公平性 | 細粒度鎖 | 支持 | 可中斷、超時、嘗試 | 高性能計數器、數據緩存 |
| 需要等待一組操作完成的場景 | 無 | 低,一次性 | 不支持公平性 | 粗粒度鎖 | 不支持 | 阻塞等待 | 任務協調、初始化操作 |
| 需要控制資源訪問數量的場景 | 信號量 | 高,控制并發數量 | 不支持公平性 | 細粒度鎖 | 支持 | 阻塞等待 | 限流、資源池管理 |
| 需要周期性執行一組操作的場景 | 無 | 低,重用性 | 支持公平性 | 粗粒度鎖 | 支持 | 阻塞等待 | 并行計算、批處理 |
2、鎖詳細分析
2.7. CyclicBarrier
CyclicBarrier 是 Java 中用于線程間同步的一種工具,它允許一組線程互相等待,直到所有線程都到達一個公共屏障點。
圖片
圖解說明:
- Java 線程:表示運行中的線程,它們可能需要在某個點同步。
- CyclicBarrier 實例:是 CyclicBarrier 類的實例,用于協調一組線程在屏障點同步。
- 屏障:表示線程需要到達的同步點,所有線程必須到達這個點才能繼續執行。
- 共享資源或任務:表示線程需要訪問的共享資源或執行的任務,它們在屏障點同步后可以安全地執行。
- 等待區:表示等待其他線程到達屏障點的線程集合。
- 計數器: CyclicBarrier 內部維護一個計數器,用于跟蹤尚未到達屏障點的線程數量。
- 屏障動作(Runnable) :可選的,當所有線程到達屏障點時,可以執行一個特定的動作或任務。
綜合說明:
- 作用: CyclicBarrier 是一種同步幫助工具,允許一組線程相互等待,直到所有線程都到達某個公共屏障點。
- 背景:在需要多個線程協作完成任務時, CyclicBarrier 提供了一種機制,使得所有線程可以在屏障點同步,然后繼續執行。
- 優點:
可重復使用:與 CountDownLatch 不同, CyclicBarrier 可以重復使用,適用于周期性的任務同步。
支持屏障動作:可以設置一個在所有線程到達屏障點后執行的回調。
- 缺點:
可能導致死鎖:如果一個或多個線程未到達屏障點,其他線程將一直等待。
復雜性:需要合理設計以避免線程永久等待。
場景:適用于需要周期性同步多個線程的場景。
業務舉例:在多階段數據處理流程中,每個階段需要所有數據都準備好后才能開始處理。使用 CyclicBarrier可以確保所有數據加載線程在每個階段開始前都已準備好。
使用方式:
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class Race {
private final CyclicBarrier barrier;
public Race(int numberOfRunners) {
barrier = new CyclicBarrier(numberOfRunners, () -> {
System.out.println("比賽開始!");
// 這里可以放置所有參與者到達屏障后要執行的操作
});
}
public void run() {
System.out.println("等待其他參賽者...");
try {
barrier.await(); // 等待其他線程
System.out.println("開始跑步!");
// 跑步時間
Thread.sleep((long) (Math.random() * 10000));
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
final int numberOfRunners = 5;
Race race = new Race(numberOfRunners);
// 創建參賽者線程
for (int i = 0; i < numberOfRunners; i++) {
final int runnerNumber = i + 1;
new Thread(() -> {
System.out.println("參賽者 " + runnerNumber + " 已準備就緒");
race.run();
}).start();
}
}
}
業務代碼案例:
業務說明: 在大數據處理系統中,經常需要對大量數據進行多階段處理,例如,數據清洗、轉換、聚合和加載。這些處理階段通常需要按順序執行,且每個階段開始前必須確保所有數據都已準備好。
為什么需要 CyclicBarrier 技術: 在多階段數據處理的場景中,不同的處理任務可能由不同的線程執行,而這些線程的執行時間可能不同。 CyclicBarrier 允許每個階段的處理在開始前等待所有相關線程完成上一階段的任務,確保數據的一致性和完整性。
沒有 CyclicBarrier 技術會帶來什么后果:
沒有使用 CyclicBarrier 或其他同步協調機制可能會導致以下問題:
- 數據不一致:如果后續階段的處理在前一階段的數據未完全準備好時開始,可能會導致處理結果不準確。
- 資源浪費:在等待數據準備的過程中,系統資源可能被無效占用,導致資源利用效率低下。
- 錯誤和異常:由于階段間的依賴關系沒有得到妥善處理,可能會引發程序錯誤或運行時異常。
代碼實現:
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class DataProcessingPipeline {
private final ExecutorService executor = Executors.newFixedThreadPool(4);
private final CyclicBarrier barrier;
private final int numberOfPhases;
private final int numberOfTasks;
public DataProcessingPipeline(int numberOfTasks, int numberOfPhases) {
this.numberOfTasks = numberOfTasks;
this.numberOfPhases = numberOfPhases;
this.barrier = new CyclicBarrier(numberOfTasks, () -> {
System.out.println("一個階段完成,準備進入下一階段");
});
}
public void processData() throws Exception {
for (int phase = 1; phase <= numberOfPhases; phase++) {
System.out.println("階段 " + phase + " 開始");
for (int task = 0; task < numberOfTasks; task++) {
final int currentTask = task;
executor.submit(() -> {
try {
// 數據處理任務
System.out.println("任務 " + currentTask + " 在階段 " + phase + " 執行");
Thread.sleep((long) (Math.random() * 5000));
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
barrier.await(); // 等待所有任務完成
}
executor.shutdown();
}
public static void main(String[] args) {
DataProcessingPipeline pipeline = new DataProcessingPipeline(4, 3);
try {
pipeline.processData();
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.8. Atomic Variables
原子變量是 Java 中 java.util.concurrent.atomic 包提供的一些類,它們利用底層硬件的原子性指令來保證操作的原子性,無需使用鎖。
圖片
圖解說明:
- Java 線程:表示運行中的線程,它們可能需要對共享資源進行原子操作。
- Atomic Variables:表示原子變量的集合,包括 AtomicInteger、 AtomicLong、 AtomicReference 等。
- AtomicInteger、AtomicLong、AtomicReference:分別表示整型、長整型和引用類型的原子變量。
- 硬件支持的原子指令:底層硬件提供的原子性指令,如 compare-and-swap (CAS)、load-linked、store-conditional 等。
- 共享資源:表示被多個線程共享的數據,如計數器、累加器等。
- 內存:表示 Java 程序使用的內存空間,包括堆和棧等。
- 變量狀態:表示原子變量在內存中的當前狀態。
綜合說明:
- 作用:原子變量類(如 AtomicInteger, AtomicLong, AtomicReference 等)提供了一種機制,使得對變量的某些操作(如自增、自減、讀取和寫入)是原子性的,無需使用傳統的鎖。
- 背景:在多線程環境中,對共享變量的并發訪問需要同步措施以防止數據競爭。原子變量利用底層硬件的原子指令來保證操作的原子性,從而簡化了線程同步。
- 優點:
無鎖設計:避免使用傳統鎖,減少了線程切換的開銷。
性能優化:對于高競爭的簡單變量訪問,原子變量通常比鎖有更好的性能。
- 缺點:
功能限制:僅適用于簡單的操作,復雜的操作無法通過原子變量實現。
可組合性問題:復雜的原子操作需要仔細設計,否則可能引入競態條件。
場景:適用于對簡單變量進行原子操作的場景,如計數器、累加器等。
業務舉例:在電商平臺的庫存管理中, AtomicInteger 可以用來原子地更新商品的庫存數量,確保在高并發環境下庫存數據的一致性。
使用方式:
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class Counter {
// 使用 AtomicInteger 來確保計數器的線程安全
private final AtomicInteger count = new AtomicInteger(0);
// 提供一個方法來增加計數器的值
public void increment() {
// 原子地增加計數器的值
count.incrementAndGet();
}
// 提供一個方法來獲取當前計數器的值
public int getCount() {
// 原子地獲取計數器的值
return count.get();
}
}
public class DataStore {
// 使用 AtomicLong 來統計數據總量
private final AtomicLong dataCount = new AtomicLong(0);
public void addData(long size) {
// 原子地將數據大小累加到總量
dataCount.addAndGet(size);
}
public long getDataCount() {
// 原子地獲取當前數據總量
return dataCount.get();
}
}
// 測試類
public class AtomicVariablesDemo {
public static void main(String[] args) {
Counter counter = new Counter();
DataStore dataStore = new DataStore();
// 多線程環境中對計數器和數據總量的更新
for (int i = 0; i < 10; i++) {
new Thread(() -> {
counter.increment();
dataStore.addData(100); // 假設每次操作增加100單位數據
}).start();
}
// 等待所有線程完成
while (Thread.activeCount() > 1) {
Thread.yield();
}
// 輸出計數器的值和數據總量
System.out.println("Counter value: " + counter.getCount());
System.out.println("Data store size: " + dataStore.getDataCount());
}
}
業務代碼案例:
場景描述:社交網絡的實時消息計數
業務說明: 社交網絡平臺需要顯示每個用戶的實時消息通知數。每當用戶收到新消息時,消息計數需要增加;用戶閱讀消息時,計數可能會減少或被重置。此計數需要對所有用戶可見,且在高并發環境下保持準確。
為什么需要 AtomicVariables 技術: 在社交網絡中,多個用戶可能同時發送消息給同一個接收者,或者一個用戶可能同時在多個設備上接收消息。這導致對消息計數的讀取和更新操作非常頻繁。使用 AtomicInteger 可以確保消息計數更新的原子性,并且在多線程環境下保持數據的一致性。
沒有 AtomicVariables 技術會帶來什么后果:
沒有使用 AtomicVariables 或其他并發控制機制可能會導致以下問題:
- 數據不一致:消息計數可能會出錯,導致用戶看到不正確的消息數量。
- 用戶體驗下降:如果消息通知不準確,用戶可能會錯過重要通知,或者對應用的可靠性產生懷疑。
- 系統復雜度增加:在沒有有效同步機制的情況下,維護數據一致性將變得復雜且容易出錯。
代碼實現:
import java.util.concurrent.atomic.AtomicInteger;
public class MessageNotificationCounter {
private final AtomicInteger messageCount = new AtomicInteger(0);
// 接收新消息時調用此方法
public void receiveMessage() {
// 原子地增加消息計數
messageCount.incrementAndGet();
System.out.println("New message received. Total messages: " + messageCount.get());
}
// 用戶閱讀消息時調用此方法
public void messagesRead() {
// 原子地減少消息計數
messageCount.decrementAndGet();
System.out.println("Messages read. Remaining messages: " + messageCount.get());
}
// 獲取當前消息計數
public int getMessageCount() {
return messageCount.get();
}
}
// 測試類
public class AtomicVariablesDemo {
public static void main(String[] args) {
MessageNotificationCounter counter = new MessageNotificationCounter();
// 多個用戶同時發送消息
Thread sender1 = new Thread(() -> {
counter.receiveMessage();
});
Thread sender2 = new Thread(() -> {
counter.receiveMessage();
});
// 用戶閱讀消息
Thread reader = new Thread(() -> {
counter.messagesRead();
});
sender1.start();
sender2.start();
reader.start();
sender1.join();
sender2.join();
reader.join();
System.out.println("Final message count: " + counter.getMessageCount());
}
}
2.9. ConcurrentHashMap
ConcurrentHashMap 是 Java 中一個線程安全的哈希表,它通過分段鎖(Segmentation)和 CAS 操作來支持高并發的讀寫操作。
圖解說明:
- Java 線程:表示運行中的線程,它們可能需要對 ConcurrentHashMap 進行讀寫操作。
- ConcurrentHashMap 實例:是 ConcurrentHashMap 類的實例,用于存儲鍵值對并提供線程安全的訪問。
- Segment 數組: ConcurrentHashMap 將哈希表分為多個段(Segment),每個段維護一部分哈希桶,通過分段鎖減少鎖的競爭。
- Hash 桶:存儲哈希桶數組,每個桶可以包含一個或多個鍵值對。
- 鏈表或紅黑樹:在哈希桶中,鍵值對最初以鏈表形式存儲,當鏈表長度超過閾值時,鏈表可能會被轉換為紅黑樹以提高搜索效率。
- 共享資源:表示存儲在 ConcurrentHashMap 中的鍵值對數據。
- 讀操作:線程可以并發地讀取 ConcurrentHashMap 中的數據,在讀多寫少的場景下,讀操作不會阻塞其他讀操作。
- 寫操作:線程對 ConcurrentHashMap 的寫入操作,寫操作需要獲取相應段的鎖。
- 鎖:每個段擁有自己的鎖,寫操作需要獲取鎖,而讀操作通常不需要。
升級設計說明:
Java 1.7 ConcurrentHashMap 鎖機制
在 Java 1.7 中, ConcurrentHashMap 使用分段鎖機制,其中每個段相當于一個小的哈希表,擁有自己的鎖。
Java 1.8 ConcurrentHashMap 鎖機制
在 Java 1.8 中, ConcurrentHashMap 摒棄了分段鎖機制,采用了 CAS 和 synchronized 來確保線程安全。
綜合說明:
- 作用: ConcurrentHashMap 是 Java 中提供的一個線程安全的哈希表,它通過分段鎖的概念來允許并發的讀寫操作,從而提高并發訪問的性能。
- 背景:傳統的 HashMap 在多線程環境下需要外部同步,而 ConcurrentHashMap 通過鎖分離技術減少了鎖的競爭,提供了更好的并發性能。
- 優點:
- 高并發:通過細分鎖到段,允許多個線程同時操作不同段的數據。
- 動態擴容:內部采用動態數組和鏈表結構,提高了空間和時間效率。
- 缺點:
復雜度高:實現復雜,需要維護多個鎖和復雜的數據結構。
性能調優:在極端高并發場景下,可能需要調整默認的并發級別。
場景:適用于需要高并發訪問的緩存或數據存儲。
業務舉例:在大數據處理系統中, ConcurrentHashMap 可以用來存儲實時計算結果,支持大量并發的讀寫操作而不會導致性能瓶頸。
使用方式:
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;
public class ConcurrentHashMapDemo {
// 創建一個 ConcurrentHashMap 實例
private final ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 將一個鍵值對插入到 Map 中
public void put(String key, Integer value) {
// put 方法是線程安全的
map.put(key, value);
}
// 從 Map 中獲取與指定鍵關聯的值
public Integer get(String key) {
// get 方法是線程安全的
return map.get(key);
}
// 計算 Map 中的元素數量
public int size() {
// size 方法是線程安全的
return map.size();
}
// 演示刪除操作
public void remove(String key) {
// remove 方法是線程安全的
map.remove(key);
}
// 演示如何批量添加數據
public void addAll(Map<String, Integer> newData) {
// putAll 方法是線程安全的
map.putAll(newData);
}
public static void main(String[] args) {
ConcurrentHashMapDemo demo = new ConcurrentHashMapDemo();
// 批量添加數據
demo.addAll(Map.of("key1", 1, "key2", 2, "key3", 3));
// 單獨添加一條數據
demo.put("key4", 4);
// 獲取并打印一條數據
System.out.println("Value for 'key1': " + demo.get("key1"));
// 獲取 Map 的大小
System.out.println("Map size: " + demo.size());
// 刪除一條數據
demo.remove("key2");
// 再次獲取 Map 的大小
System.out.println("Map size after removal: " + demo.size());
}
}
業務代碼案例:
業務說明: 在分布式緩存系統中,經常需要存儲和檢索用戶會話信息、應用配置、熱點數據等。這些數據需要被多個應用實例共享,并且要求在高并發環境下依然保持高性能。緩存數據通常有過期時間,需要定期清理。
為什么需要 ConcurrentHashMap 技術: ConcurrentHashMap 提供了一種高效的方式來處理并發的讀取和更新操作,并且它的分段鎖機制允許多個線程同時對不同段進行操作,從而提高并發處理能力。此外, ConcurrentHashMap 在 Java 8 中引入的紅黑樹結構使得即使在高并發更新導致哈希沖突時,也能保持高效的性能。
沒有 ConcurrentHashMap 技術會帶來什么后果:
沒有使用 ConcurrentHashMap 可能會導致以下問題:
- 性能瓶頸:在高并發環境下,如果使用 HashMap 加 synchronized,可能導致嚴重的性能瓶頸,因為所有線程必須等待一個鎖。
- 數據不一致:在沒有適當同步的情況下,多個線程同時更新數據可能導致緩存數據不一致。
- 擴展性差:隨著系統負載的增加,基于 HashMap 的緩存解決方案可能難以擴展,因為鎖競爭和線程安全問題。
代碼實現:
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
public class DistributedCache<K, V> {
private final ConcurrentHashMap<K, V> cacheMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<K, Long> expirationMap = new ConcurrentHashMap<>();
public void put(K key, V value, long ttl) {
cacheMap.put(key, value);
expirationMap.put(key, System.currentTimeMillis() + ttl);
scheduleEviction(key, ttl);
}
public V get(K key) {
Long expirationTime = expirationMap.get(key);
if (expirationTime == null || expirationTime < System.currentTimeMillis()) {
cacheMap.remove(key);
expirationMap.remove(key);
return null;
}
return cacheMap.get(key);
}
private void scheduleEviction(final K key, final long ttl) {
new Thread(() -> {
try {
TimeUnit.MILLISECONDS.sleep(ttl);
cacheMap.computeIfPresent(key, (k, v) -> null);
expirationMap.remove(key);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
public static void main(String[] args) {
DistributedCache<String, String> cache = new DistributedCache<>();
cache.put("userSession", "sessionData", 5000); // 緩存設置5秒過期
// 多個線程并發訪問緩存
for (int i = 0; i < 100; i++) {
int finalI = i;
new Thread(() -> {
String result = cache.get("userSession");
System.out.println("Thread " + finalI + " retrieved: " + result);
}).start();
}
}
}
2.10.ConcurrentSkipListMap
ConcurrentSkipListMap 是 Java 中實現的一個高性能并發的有序映射,它使用跳表(Skip List)作為其底層數據結構。
圖片
圖解說明:
- Java 線程:表示運行中的線程,它們可能需要對 ConcurrentSkipListMap 進行讀寫操作。
- ConcurrentSkipListMap 實例:是 ConcurrentSkipListMap 類的實例,用于存儲鍵值對并提供線程安全的訪問。
- Skip List 層級結構:跳表由多層索引構成,每一層都是一個有序的鏈表。
- 索引層:跳表中的索引層,用于加速搜索操作。
- 數據層:跳表中的底層數據結構,存儲實際的鍵值對。
- Node 節點:跳表中的節點,包含鍵值對和指向其他節點的鏈接。
- 共享資源:表示存儲在 ConcurrentSkipListMap 中的鍵值對數據。
- 讀操作:線程可以并發地讀取 ConcurrentSkipListMap 中的數據。
- 寫操作:線程可以并發地修改 ConcurrentSkipListMap 中的數據。
- CAS 操作:在更新節點鏈接或修改數據時,使用 CAS 操作來保證線程安全。
- 自旋鎖/同步塊:在某些情況下,如果 CAS 操作失敗,可能會使用自旋鎖或同步塊來確保操作的原子性。
操作流程:
- 讀操作:
線程通過索引層快速定位到數據層的節點。
線程使用 volatile 讀取節點的值,確保內存可見性。
- 寫操作:
線程在更新或添加節點時,首先嘗試使用 CAS 操作。
如果 CAS 操作失敗,線程可能會使用自旋鎖或同步塊來確保原子性。
綜合說明:
作用: ConcurrentSkipListMap 是一種線程安全的有序映射,它通過使用跳表(Skip List)數據結構來支持高效的并發訪問和排序操作。 背景:在需要高效并發訪問和保持元素有序的場景中,傳統的 TreeMap 由于其加鎖策略在高并發環境下性能受限, ConcurrentSkipListMap 提供了一種替代方案。 優點:
- 高性能并發訪問:通過跳表結構和細粒度鎖定,實現了高效的并發讀取和更新。
- 有序性:保持元素的有序性,支持范圍查詢等操作。
- 動態調整:可以根據訪問模式動態調整結構,優化性能。 缺點:
- 內存占用:相比無序的 ConcurrentHashMap,由于維護了有序性,內存占用可能更高。
- 復雜性:實現相對復雜,涉及多級索引和節點的管理。 場景:適用于需要有序數據且高并發訪問的場景,如實時數據索引、范圍查詢等。 業務舉例:在一個金融市場分析系統中,需要維護一個實時更新的價格索引, ConcurrentSkipListMap 可以用來存儲和快速檢索各種金融工具的當前價格。
使用方式:
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
public class ConcurrentSkipListMapDemo {
// 創建一個 ConcurrentSkipListMap 實例
private final ConcurrentSkipListMap<Integer, String> map = new ConcurrentSkipListMap<>();
// 將一個鍵值對插入到 Map 中
public void put(Integer key, String value) {
// put 方法是線程安全的
map.put(key, value);
}
// 從 Map 中獲取與指定鍵關聯的值
public String get(Integer key) {
// get 方法是線程安全的
return map.get(key);
}
// 獲取 Map 的鍵集合
public java.util.NavigableSet<Integer> keySet() {
// keySet 方法返回 Map 的鍵集合視圖
return map.keySet();
}
// 獲取 Map 的值集合
public java.util.Collection<String> values() {
// values 方法返回 Map 的值集合視圖
return map.values();
}
// 獲取 Map 的大小
public int size() {
// size 方法是線程安全的
return map.size();
}
// 演示刪除操作
public void remove(Integer key) {
// remove 方法是線程安全的
map.remove(key);
}
public static void main(String[] args) {
ConcurrentSkipListMapDemo demo = new ConcurrentSkipListMapDemo();
// 插入一些數據
demo.put(1, "One");
demo.put(2, "Two");
demo.put(3, "Three");
// 獲取并打印一條數據
System.out.println("Value for key 2: " + demo.get(2));
// 獲取 Map 的大小
System.out.println("Map size: " + demo.size());
// 獲取并打印所有鍵
System.out.println("All keys: " + demo.keySet());
// 刪除一條數據
demo.remove(2);
// 再次獲取 Map 的大小
System.out.println("Map size after removal: " + demo.size());
// 獲取并打印所有值
System.out.println("All values: " + demo.values());
}
}
業務代碼案例:
業務說明: 實時股票交易系統需要維護一個動態變化的股票價格索引,該索引需要根據實時的市場數據進行更新,并且允許多個交易線程并發地讀取和更新股票價格。此外,系統還需要定期根據價格波動進行調整,如計算價格的平均值、執行價格范圍查詢等。
為什么需要 ConcurrentSkipListMap 技術: ConcurrentSkipListMap 是一個線程安全的有序映射,它允許高效的范圍查詢和有序訪問,這對于股票價格索引來說至關重要。由于股票價格會頻繁更新,且需要快速響應市場變化,使用 ConcurrentSkipListMap 可以提供高效的插入、刪除和查找操作,同時保持數據的有序性。
沒有 ConcurrentSkipListMap 技術會帶來什么后果:
沒有使用 ConcurrentSkipListMap 或其他適合有序并發操作的數據結構可能會導致以下問題:
- 性能瓶頸:如果使用 HashMap 或 ConcurrentHashMap,雖然可以實現并發更新,但無法高效執行有序操作和范圍查詢,可能導致查詢性能不佳。
- 數據不一致:在高并發更新的情況下,如果沒有適當的同步機制,可能會導致價格信息的不一致。
- 復雜性增加:如果使用 synchronized 列表或數組來維護價格索引,可能需要手動管理復雜的同步和排序邏輯,增加系統復雜性和出錯的風險。
代碼實現:
import java.util.concurrent.ConcurrentSkipListMap;
public class StockPriceIndex {
private final ConcurrentSkipListMap<String, Double> priceIndex = new ConcurrentSkipListMap<>();
public void updatePrice(String stockSymbol, Double newPrice) {
// 更新股票價格
priceIndex.put(stockSymbol, newPrice);
}
public Double getPrice(String stockSymbol) {
// 獲取股票價格
return priceIndex.get(stockSymbol);
}
public void removeStock(String stockSymbol) {
// 移除股票信息
priceIndex.remove(stockSymbol);
}
public ConcurrentSkipListMap<String, Double> headMap(String toKey) {
// 獲取指定范圍內的股票價格索引
return priceIndex.headMap(toKey);
}
public static void main(String[] args) {
StockPriceIndex index = new StockPriceIndex();
index.updatePrice("AAPL", 150.00);
index.updatePrice("GOOGL", 2750.50);
index.updatePrice("MSFT", 250.00);
System.out.println("Price of AAPL: " + index.getPrice("AAPL"));
System.out.println("Price of GOOGL: " + index.getPrice("GOOGL"));
// 獲取所有小于 "MSFT" 的股票價格索引
ConcurrentSkipListMap<String, Double> subMap = index.headMap("MSFT");
subMap.forEach((k, v) -> System.out.println("Stock: " + k + ", Price: " + v));
}
}
2.11. ConcurrentLinkedQueue
ConcurrentLinkedQueue 是 Java 中一個線程安全的無鎖隊列,它使用 CAS (Compare-And-Swap) 操作來保證線程安全。
圖片
圖解說明:
- Java 線程:表示運行中的線程,它們可能需要對 ConcurrentLinkedQueue 進行入隊或出隊操作。
- ConcurrentLinkedQueue 實例:是 ConcurrentLinkedQueue 類的實例,用于存儲隊列中的元素并提供線程安全的訪問。
- Node 節點結構: ConcurrentLinkedQueue 使用內部的 Node 類來存儲隊列中的每個元素。每個節點包含隊列中的一個元素和指向下一個節點的鏈接。
- 虛擬頭節點:隊列使用一個虛擬頭節點來簡化出隊操作。虛擬頭節點不存儲實際的隊列元素。
- 虛擬尾節點:隊列使用一個虛擬尾節點來簡化入隊操作。虛擬尾節點指向隊列中的最后一個節點。
- 隊列元素:表示存儲在隊列中的實際數據。
- 入隊操作:線程將新元素添加到隊列尾部的過程,通過 CAS 更新虛擬尾節點的鏈接。
- 出隊操作:線程從隊列頭部移除元素的過程,通過 CAS 更新虛擬頭節點的鏈接。
- CAS 操作: ConcurrentLinkedQueue 使用 CAS 操作來更新節點之間的鏈接,從而實現無鎖的線程安全隊列。
- 自旋等待:在 CAS 操作失敗時,線程可能會自旋等待直到操作成功。
操作流程:
- 入隊操作:線程通過 CAS 操作將新節點插入到隊列尾部,并更新尾節點指針。
- 出隊操作:線程通過 CAS 操作移除隊列頭部的節點,并更新頭節點指針。
- CAS 操作:在入隊和出隊過程中,線程使用 CAS 來保證節點鏈接的原子性更新。
綜合說明:
作用: ConcurrentLinkedQueue 是一種基于鏈接節點的無界線程安全隊列,支持高并發的入隊和出隊操作。 背景:在多線程環境中,需要一種高效的隊列來處理任務或消息傳遞, ConcurrentLinkedQueue 提供了一種無鎖的解決方案。 優點:
- 無鎖設計:利用 CAS 操作實現無鎖的線程安全隊列,提高了并發性能。
- 簡單高效:提供了簡單的入隊和出隊操作,適合作為任務隊列或消息傳遞隊列。
- 無界隊列:理論上隊列大小無界,適用于處理大量任務。 缺點:
- 可能的內存消耗:由于是無界隊列,在極端情況下可能會消耗大量內存。
- 性能限制:在某些高競爭場景下,CAS 操作可能導致性能瓶頸。 場景:適用于作為任務隊列或消息傳遞隊列,支持高并發的入隊和出隊操作。 業務舉例:在一個分布式計算系統中, ConcurrentLinkedQueue 可以用于收集各個計算節點的輸出結果,然后由一個或多個消費者線程進行處理。
這兩個并發集合類在 Java 中提供了強大的工具,以支持復雜的并發數據處理需求,它們各自適用于不同的應用場景,可以根據具體需求選擇合適的并發集合。kedQueue
使用方式:
import java.util.concurrent.ConcurrentLinkedQueue;
public class ConcurrentLinkedQueueDemo {
// 創建一個 ConcurrentLinkedQueue 實例
private final ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
// 向隊列中添加元素
public void add(int number) {
// add 方法是線程安全的
queue.add(number);
System.out.println("Added " + number);
}
// 從隊列中獲取并移除元素
public Integer poll() {
// poll 方法是線程安全的,返回并移除隊列頭部的元素
Integer result = queue.poll();
if (result != null) {
System.out.println("Polled " + result);
} else {
System.out.println("Queue is empty");
}
return result;
}
// 查看隊列頭部的元素但不移除
public Integer peek() {
// peek 方法是線程安全的,返回隊列頭部的元素但不移除
Integer result = queue.peek();
if (result != null) {
System.out.println("Peeked " + result);
} else {
System.out.println("Queue is empty");
}
return result;
}
// 獲取隊列的大小
public int size() {
// size 方法估算隊列的大小
int size = queue.size();
System.out.println("Queue size: " + size);
return size;
}
public static void main(String[] args) {
ConcurrentLinkedQueueDemo demo = new ConcurrentLinkedQueueDemo();
// 啟動生產者線程
Thread producerThread = new Thread(() -> {
demo.add(1);
demo.add(2);
demo.add(3);
});
// 啟動消費者線程
Thread consumerThread = new Thread(() -> {
demo.poll();
demo.poll();
demo.poll();
demo.poll(); // 這次調用應該會返回 null,因為隊列已空
});
producerThread.start();
consumerThread.start();
producerThread.join();
consumerThread.join();
// 在所有線程完成后獲取隊列大小
demo.size();
}
}
業務代碼案例:
業務說明: 大規模日志處理系統需要從多個源實時收集、存儲并分析日志數據。這些日志數據通常由分布在不同服務器上的應用程序生成,并且需要被快速地處理以避免數據丟失或延遲問題。
為什么需要 ConcurrentLinkedQueue 技術: 在日志處理場景中,日志數據的產生速度往往非常快,且來源眾多,因此需要一個高效且線程安全的隊列來緩存這些日志數據。 ConcurrentLinkedQueue 提供了高吞吐量和低延遲的并發訪問,無需使用鎖,使得它特別適合用作日志數據的緩沖區。此外,由于 ConcurrentLinkedQueue 是無界的,因此不會阻塞生產者線程,即使在高負載情況下也能保持高性能。
沒有 ConcurrentLinkedQueue 技術會帶來什么后果: 沒有使用 ConcurrentLinkedQueue 或其他高效的并發隊列可能會導致以下問題:
- 數據丟失:如果使用有界隊列且沒有適當的生產者速率控制,可能會因為隊列滿導致日志數據丟失。
- 性能瓶頸:如果使用鎖或其他同步機制來保護共享隊列,可能會導致性能瓶頸,尤其是在高并發場景下。
- 系統不穩定:在高負載情況下,如果隊列處理速度跟不上數據產生速度,可能會導致系統崩潰或重啟。
代碼實現:
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class LogProcessor {
private final ConcurrentLinkedQueue<String> logQueue = new ConcurrentLinkedQueue<>();
private final ExecutorService processorPool = Executors.newFixedThreadPool(10);
public void log(String message) {
// 生產者線程調用此方法來添加日志到隊列
logQueue.add(message);
}
public void startLogProcessing() {
// 消費者線程池,用于處理隊列中的日志
processorPool.submit(() -> {
while (true) {
try {
// 消費者線程調用此方法來處理隊列中的日志
String logEntry = logQueue.poll();
if (logEntry != null) {
processLog(logEntry);
} else {
TimeUnit.MILLISECONDS.sleep(100); // 避免 CPU 過載
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}
private void processLog(String logEntry) {
// 實際處理日志的邏輯
System.out.println("Processing log: " + logEntry);
}
public static void main(String[] args) {
LogProcessor logProcessor = new LogProcessor();
logProcessor.startLogProcessing();
// 多個生產者線程生成日志
for (int i = 0; i < 100; i++) {
int finalI = i;
new Thread(() -> {
logProcessor.log("Log entry " + finalI);
}).start();
}
}
}
2.12. BlockingQueue
BlockingQueue 是 Java 中用于線程間通信的隊列,支持阻塞操作,當隊列為空時,獲取元素的操作會阻塞;當隊列滿時,插入元素的操作會阻塞。
圖片
圖解說明:
- Java 線程:表示運行中的線程,它們可能需要向隊列中添加或移除元素。
- BlockingQueue 實例:是 BlockingQueue 接口的具體實現,如 ArrayBlockingQueue、 LinkedBlockingQueue 等,用于線程間通信。
- 內部數據結構:表示 BlockingQueue 內部用于存儲元素的數據結構,如數組、鏈表等。
- 隊列容量:表示 BlockingQueue 的最大容量,如果隊列有界,則插入操作在隊列滿時會阻塞。
- 等待區(元素) :表示當隊列為空時,等待獲取元素的線程集合。
- 等待區(空間) :表示當隊列滿時,等待空間釋放的線程集合。
- 元素添加操作:表示向 BlockingQueue 中添加元素的操作,如果隊列滿,則操作會阻塞。
- 元素移除操作:表示從 BlockingQueue 中移除元素的操作,如果隊列為空,則操作會阻塞。
綜合說明:
- 作用: BlockingQueue 是一個線程安全的隊列,支持阻塞操作,當隊列為空時,獲取元素的操作會阻塞;當隊列滿時,插入元素的操作會阻塞。
- 背景:在生產者-消費者模型中,需要一種機制來協調生產者和消費者之間的操作, BlockingQueue 提供了這種協調。
- 優點:
- 線程協調:自然地實現了生產者-消費者之間的線程協調。
- 阻塞操作:提供了阻塞獲取和阻塞插入的方法,簡化了并發編程。
- 缺點:
可能的死鎖:不當使用可能導致死鎖,例如一個線程永久阻塞等待一個不會到來的元素。
性能考慮:在高并發環境下,隊列的容量和鎖策略需要仔細調優。
場景:適用于生產者-消費者場景,如任務分配、資源池管理等。
業務舉例:在消息處理系統中, BlockingQueue 可以用于緩存待處理的消息,生產者線程生成消息并放入隊列,消費者線程從隊列中取出并處理消息,確保了消息的順序性和系統的響應性。
使用方式:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class BlockingQueueDemo {
// 創建一個 LinkedBlockingQueue 實例,容量限制為10
private final BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(10);
// 向 BlockingQueue 中添加元素
public void produce(Integer element) throws InterruptedException {
// put 方法在隊列滿時阻塞,直到隊列中有空間
blockingQueue.put(element);
System.out.println("Produced: " + element);
}
// 從 BlockingQueue 中獲取元素
public Integer consume() throws InterruptedException {
// take 方法在隊列空時阻塞,直到隊列中有元素
Integer element = blockingQueue.take();
System.out.println("Consumed: " + element);
return element;
}
// 獲取 BlockingQueue 的大小
public int size() {
// size 方法返回隊列當前的元素數量
return blockingQueue.size();
}
public static void main(String[] args) throws InterruptedException {
BlockingQueueDemo demo = new BlockingQueueDemo();
// 創建生產者線程
Thread producerThread = new Thread(() -> {
try {
for (int i = 0; i < 15; i++) {
demo.produce(i);
Thread.sleep(100); // 生產延時
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 創建消費者線程
Thread consumerThread = new Thread(() -> {
try {
for (int i = 0; i < 15; i++) {
int element = demo.consume();
Thread.sleep(150); // 消費延時
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producerThread.start();
consumerThread.start();
producerThread.join();
consumerThread.join();
// 打印最終隊列的大小
System.out.println("Final queue size: " + demo.size());
}
}
業務代碼案例:
業務說明: 消息隊列系統在微服務架構中用于異步處理任務,例如發送郵件、短信通知等。這些服務通常由獨立的服務實例處理,以提高系統的響應性和可擴展性。消息隊列需要能夠處理高并發的消息生產和消費,確保消息的可靠傳遞。
為什么需要 BlockingQueue 技術: BlockingQueue 提供了一種有效的機制來處理生產者-消費者場景,特別是在面對高并發和需要線程安全時。它能夠使生產者在隊列滿時阻塞,消費者在隊列空時阻塞,從而平衡生產和消費的速度,確保系統的穩定性和消息的不丟失。
沒有 BlockingQueue 技術會帶來什么后果:
沒有使用 BlockingQueue 或其他并發隊列可能會導致以下問題:
- 消息丟失:在高并發情況下,如果沒有適當的機制來控制消息的產生和消費,可能會導致消息丟失。
- 系統過載:如果沒有流控機制,生產者可能會過快地生成消息,導致系統資源耗盡,甚至崩潰。
- 數據不一致:在多線程環境下,如果不正確地管理消息的訪問,可能會導致數據處理的不一致性。
代碼實現:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class MessageQueueSystem {
private final BlockingQueue<Message> messageQueue = new LinkedBlockingQueue<>();
public void produceMessage(String content) throws InterruptedException {
// 將消息添加到隊列中,如果隊列滿了,生產者線程將被阻塞
messageQueue.put(new Message(content));
System.out.println("Message produced: " + content);
}
public Message consumeMessage() throws InterruptedException {
// 從隊列中取出消息,如果隊列空了,消費者線程將被阻塞
Message message = messageQueue.take();
System.out.println("Message consumed: " + message.getContent());
return message;
}
public static void main(String[] args) throws InterruptedException {
MessageQueueSystem messageQueueSystem = new MessageQueueSystem();
// 創建生產者線程
Thread producerThread = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
messageQueueSystem.produceMessage("Message " + i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 創建消費者線程
Thread consumerThread = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
messageQueueSystem.consumeMessage();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producerThread.start();
consumerThread.start();
producerThread.join();
consumerThread.join();
}
}
class Message {
private final String content;
public Message(String content) {
this.content = content;
}
public String getContent() {
return content;
}
}
2.13. Condition
Condition 是 Java 中 java.util.concurrent.locks 包提供的一個接口,它用于實現等待/通知機制。 Condition 通常與 Lock 接口配合使用,允許一個或多個線程在某些條件滿足之前掛起,并在條件滿足時被喚醒。
圖片
圖解說明:
- Java 線程:表示運行中的線程,它們可能需要在某些條件滿足之前掛起。
- Lock 實例:是 Lock 接口的具體實現,如 ReentrantLock,用于控制對共享資源的訪問。
- Condition 實例:是 Condition 接口的具體實現,與 Lock 實例配合使用,用于線程間的等待/通知機制。
- 等待隊列(線程) :當線程調用 Condition 的 await() 方法時,如果條件不滿足,線程會被放入等待隊列。
- 共享資源:表示被多個線程共享的數據,需要通過 Lock 和 Condition 來保護以確保線程安全。
- 條件檢查:表示線程在嘗試獲取資源之前需要檢查的條件。
- 喚醒信號:當條件滿足時,其他線程會發送喚醒信號給等待隊列中的線程。
- 鎖狀態:表示鎖的當前狀態,如是否被鎖定,以及鎖定的線程等。
操作流程:
- 鎖定:線程通過 Lock 實例獲取鎖。
- 條件檢查:線程檢查條件是否滿足。
- 等待:如果條件不滿足,線程調用 Condition 的 await() 方法,釋放鎖并進入等待隊列。
- 喚醒:當條件滿足時,其他線程調用 Condition 的 signal() 或 signalAll() 方法,發送喚醒信號給等待隊列中的線程。
- 重新競爭鎖:被喚醒的線程重新競爭鎖。
- 再次檢查條件:線程在重新獲得鎖后,再次檢查條件是否滿足,如果滿足則繼續執行。
綜合說明:
- 作用: Condition 是與 Lock 接口配合使用的同步輔助工具,它允許一個或多個線程等待,直到被其他線程喚醒。
- 背景:在復雜的同步場景中,需要更細粒度的控制線程的等待和喚醒, Condition 提供了這種能力。
- 優點:
- 細粒度控制:提供了比 Object.wait()/ Object.notify() 更靈活的線程間協調機制。
- 多條件支持:一個鎖可以關聯多個條件,每個條件可以獨立喚醒等待的線程。
- 缺點:
使用復雜:需要與 Lock 一起使用,增加了編程復雜度。
錯誤使用可能導致死鎖或線程饑餓。
場景:適用于需要線程間復雜協調的場景,如任務調度、資源分配等。
業務舉例:在酒店預訂系統中, Condition 可以用于實現房間狀態的等待和通知機制。當房間變為空閑時,等待的顧客可以被通知并進行預訂。
使用方式:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class BoundedBuffer {
private final Object[] buffer;
private int putPtr, takePtr, count;
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public BoundedBuffer(int size) {
buffer = new Object[size];
}
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == buffer.length) { // 等待直到緩沖區非滿
notFull.await();
}
buffer[putPtr] = x;
putPtr = (putPtr + 1) % buffer.length;
count++;
notEmpty.signal(); // 通知可能等待的消費者
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0) { // 等待直到緩沖區非空
notEmpty.await();
}
Object x = buffer[takePtr];
takePtr = (takePtr + 1) % buffer.length;
count--;
notFull.signal(); // 通知可能等待的生產者
return x;
} finally {
lock.unlock();
}
}
}
public class ProducerConsumerDemo {
private final BoundedBuffer buffer;
public ProducerConsumerDemo(int size) {
buffer = new BoundedBuffer(size);
}
public void produce(String item) {
buffer.put(item);
}
public String consume() {
return (String) buffer.take();
}
public static void main(String[] args) throws InterruptedException {
final int SIZE = 10;
final ProducerConsumerDemo demo = new ProducerConsumerDemo(SIZE);
// 生產者線程
Thread producerThread = new Thread(() -> {
for (int i = 0; i < 20; i++) {
demo.produce("Item " + i);
try {
Thread.sleep(100); // 生產延時
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
// 消費者線程
Thread consumerThread = new Thread(() -> {
for (int i = 0; i < 20; i++) {
String item = demo.consume();
System.out.println("Consumed: " + item);
try {
Thread.sleep(150); // 消費延時
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producerThread.start();
consumerThread.start();
producerThread.join();
consumerThread.join();
}
}
業務代碼案例:
業務說明: 任務調度系統負責管理和執行定時任務。這些任務可能包括數據備份、報告生成、系統維護等。系統需要能夠按預定時間觸發任務,并確保任務在執行時不會相互干擾。
為什么需要 Condition 技術: 在任務調度系統中,任務的觸發通常依賴于時間,而任務的執行可能需要等待特定條件滿足。 Condition 配合 Lock 使用,可以在沒有任務可執行時讓調度器線程等待,直到有任務準備好執行。這種機制允許系統在沒有任務執行需求時保持空閑,從而節省資源。
沒有 Condition 技術會帶來什么后果:
沒有使用 Condition 或其他等待/通知機制可能會導致以下問題:
- 資源浪費:如果調度器不斷輪詢檢查新任務,可能會浪費大量 CPU 資源。
- 響應性差:在新任務到來時,如果沒有有效的機制來喚醒調度器,可能會導致任務執行延遲。
- 代碼復雜度:沒有 Condition,可能需要使用更復雜的多線程同步機制,增加了代碼的復雜性和出錯的風險。
代碼實現:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.Date;
import java.util.LinkedList;
import java.util.Queue;
public class TaskScheduler {
private final ReentrantLock lock = new ReentrantLock();
private final Condition taskAvailable = lock.newCondition();
private final Queue<Runnable> tasks = new LinkedList<>();
public void schedule(Runnable task, long delay) {
lock.lock();
try {
tasks.add(() -> {
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
task.run();
});
taskAvailable.signal(); // 通知調度器有新任務
} finally {
lock.unlock();
}
}
public void startScheduling() {
new Thread(this::runScheduler).start();
}
private void runScheduler() {
lock.lock();
try {
while (true) {
while (tasks.isEmpty()) { // 如果沒有任務,等待
taskAvailable.await();
}
Runnable task = tasks.poll();
task.run();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
TaskScheduler scheduler = new TaskScheduler();
scheduler.schedule(() -> System.out.println("Task 1 executed at " + new Date()), 2000);
scheduler.schedule(() -> System.out.println("Task 2 executed at " + new Date()), 4000);
scheduler.startScheduling();
}
}