喝了100杯醬香拿鐵,我開竅了
作者:哪吒
使用“越細粒度的鎖越好”,真的是這樣嗎?會不會產生一些其它問題?通過對醬香拿鐵進行排序,解決了死鎖問題,避免循環等待,效率也得到了提升。
大家好,我是哪吒。
上一篇提到了鎖粒度的問題,使用“越細粒度的鎖越好”,真的是這樣嗎?會不會產生一些其它問題?
先說結論,可能會產生死鎖問題。
下面還是以購買醬香拿鐵為例:
1、定義咖啡實體類Coffee
@Data
public class Coffee {
// 醬香拿鐵
private String name;
// 庫存
public Integer inventory;
public ReentrantLock lock = new ReentrantLock();
}
2、初始化數據
private static List<Coffee> coffeeList = generateCoffee();
public static List<Coffee> generateCoffee(){
List<Coffee> coffeeList = new ArrayList<>();
coffeeList.add(new Coffee("醬香拿鐵1", 100));
coffeeList.add(new Coffee("醬香拿鐵2", 100));
coffeeList.add(new Coffee("醬香拿鐵3", 100));
coffeeList.add(new Coffee("醬香拿鐵4", 100));
coffeeList.add(new Coffee("醬香拿鐵5", 100));
return coffeeList;
}
3、隨機獲取n杯咖啡
// 隨機獲取n杯咖啡
private static List<Coffee> getCoffees(int n) {
if(n >= coffeeList.size()){
return coffeeList;
}
List<Coffee> randomList = Stream.iterate(RandomUtils.nextInt(n), i -> RandomUtils.nextInt(coffeeList.size()))
.distinct()// 去重
.map(coffeeList::get)// 跟據上面取得的下標獲取咖啡
.limit(n)// 截取前面 需要隨機獲取的咖啡
.collect(Collectors.toList());
return randomList;
}
4、購買咖啡
private static boolean buyCoffees(List<Coffee> coffees) {
//存放所有獲得的鎖
List<ReentrantLock> locks = new ArrayList<>();
for (Coffee coffee : coffees) {
try {
// 獲得鎖3秒超時
if (coffee.lock.tryLock(3, TimeUnit.SECONDS)) {
// 拿到鎖之后,扣減咖啡庫存
locks.add(coffee.lock);
coffeeList = coffeeList.stream().map(x -> {
// 購買了哪個,就減哪個
if (coffee.getName().equals(x.getName())) {
x.inventory--;
}
return x;
}).collect(Collectors.toList());
} else {
locks.forEach(ReentrantLock::unlock);
return false;
}
} catch (InterruptedException e) {
}
}
locks.forEach(ReentrantLock::unlock);
return true;
}
3、通過parallel并行流,購買100次醬香拿鐵,一次買2杯,統計成功次數
public static void main(String[] args){
StopWatch stopWatch = new StopWatch();
stopWatch.start();
// 通過parallel并行流,購買100次醬香拿鐵,一次買2杯,統計成功次數
long success = IntStream.rangeClosed(1, 100).parallel()
.mapToObj(i -> {
List<Coffee> getCoffees = getCoffees(2);
//Collections.sort(getCoffees, Comparator.comparing(Coffee::getName));
return buyCoffees(getCoffees);
})
.filter(result -> result)
.count();
stopWatch.stop();
System.out.println("成功次數:"+success);
System.out.println("方法耗時:"+stopWatch.getTotalTimeSeconds()+"秒");
for (Coffee coffee : coffeeList) {
System.out.println(coffee.getName()+"-剩余:"+coffee.getInventory()+"杯");
}
}
耗時有點久啊,20多秒。
數據對不對?
- 醬香拿鐵1賣了53杯。
- 醬香拿鐵2賣了57杯。
- 醬香拿鐵3賣了20杯。
- 醬香拿鐵4賣了22杯。
- 醬香拿鐵5賣了19杯。
- 一共賣了171杯。
數量也對不上,應該賣掉200杯才對,哪里出問題了?
4、使用visualvm測一下:
果不其然,出問題了,產生了死鎖。
線程 m 在等待的一個鎖被線程 n 持有,線程 n 在等待的另一把鎖被線程 m 持有。
- 比如美杜莎買了醬香拿鐵1和醬香拿鐵2,小醫仙買了醬香拿鐵2和醬香拿鐵1;
- 美杜莎先獲得了醬香拿鐵1的鎖,小醫仙獲得了醬香拿鐵2的鎖;
- 然后美杜莎和小醫仙接下來要分別獲取 醬香拿鐵2 和 醬香拿鐵1 的鎖;
- 這個時候鎖已經被對方獲取了,只能相互等待一直到 3 秒超時。
5、如何解決呢?
讓大家都先拿一樣的醬香拿鐵不就好了。讓所有線程都先獲取醬香拿鐵1的鎖,然后再獲取醬香拿鐵2的鎖,這樣就不會出問題了。
也就是在隨機獲取n杯咖啡后,對其進行排序即可。
// 通過parallel并行流,購買100次醬香拿鐵,一次買2杯,統計成功次數
long success = IntStream.rangeClosed(1, 100).parallel()
.mapToObj(i -> {
List<Coffee> getCoffees = getCoffees(2);
// 根據咖啡名稱進行排序
Collections.sort(getCoffees, Comparator.comparing(Coffee::getName));
return buyCoffees(getCoffees);
})
.filter(result -> result)
.count();
6、再測試一下
- 成功次數100。
- 咖啡賣掉了200杯,數量也對得上。
- 代碼執行速度也得到了質的飛躍,因為不用沒有循環等待鎖的時間了。
看來真的不是越細粒度的鎖越好,真的會產生死鎖問題。通過對醬香拿鐵進行排序,解決了死鎖問題,避免循環等待,效率也得到了提升。
責任編輯:姜華
來源:
哪吒編程