基于 Netty 源碼學(xué)習(xí)那些并發(fā)技巧
減小鎖的粒度
總說周知,使用synchronized 修飾的代碼塊在單位時(shí)間內(nèi)只允許一個(gè)線程操作臨界資源,這使得其他無法操作臨界資源的線程都會(huì)阻塞在synchronized所修飾的臨界資源的所持有的ObjectMonitor的_WaitSet 中:
所以通過減小鎖的粒度,可以保證高并發(fā)場(chǎng)景下等待的線程可以盡可能快的得到臨界資源,從而提升程序整體執(zhí)行的并發(fā)度:
這一點(diǎn)Netty中內(nèi)存池的源碼PoolArena的計(jì)算活躍分配數(shù)量的方法numActiveAllocations的處理就做的非常出色,可以看到它在計(jì)算val時(shí)并沒有鎖住整個(gè)方法,而是在需要進(jìn)行臨界計(jì)算的部分加一個(gè)synchronized 關(guān)鍵字:
@Override
public long numActiveAllocations() {
//運(yùn)算
long val = allocationsSmall.value() + allocationsHuge.value()
- deallocationsHuge.value();
//必要的部分上鎖
synchronized (this) {
val += allocationsNormal - (deallocationsSmall + deallocationsNormal);
}
return max(val, 0);
}
減小空間占用
netty通過totalPendingSize計(jì)算待發(fā)送的數(shù)據(jù)大小,單位為long,為了保證并發(fā)計(jì)算的準(zhǔn)確性,netty將其設(shè)置為volatile保證可見性,再通過AtomicLongFieldUpdater將其封裝為原子類:
private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");
@SuppressWarnings("UnusedDeclaration")
private volatile long totalPendingSize;
那么問題來了?為什么不直接使用AtomicLong類型呢? 總說周知,包裝類內(nèi)部包含markword等對(duì)象頭字段,整體大小遠(yuǎn)大于基類型,所以netty為了保證CPU緩存能夠一次性加載totalPendingSize就將其設(shè)置為基類型,并自封裝為原子類進(jìn)行操作,在保證線程安全的同時(shí),又能保證CPU緩存加載和執(zhí)行的效率:
提高鎖的效率
對(duì)于需要進(jìn)行計(jì)數(shù)但不經(jīng)常查看結(jié)果的變量,Netty使用LongAdder 而不是AtomicLong:
@SuppressJava6Requirement(reason = "Usage guarded by java version check")
final class LongAdderCounter extends LongAdder implements LongCounter {
@Override
public long value() {
return longValue();
}
}
這里筆者也簡(jiǎn)單介紹一下原因,前者進(jìn)行并發(fā)的時(shí)候會(huì)讓線程計(jì)算隨機(jī)運(yùn)算得到LongAdder內(nèi)部計(jì)數(shù)數(shù)組的某個(gè)位置,LongAdder會(huì)根據(jù)當(dāng)前計(jì)數(shù)線程競(jìng)爭(zhēng)情況對(duì)數(shù)組進(jìn)行動(dòng)態(tài)擴(kuò)容,最終在計(jì)算結(jié)果的時(shí)候,需要將數(shù)組中的所有元素結(jié)合起來。 所以這種數(shù)據(jù)結(jié)構(gòu)對(duì)于并發(fā)累加操作性能表現(xiàn)比較好,但是對(duì)于統(tǒng)計(jì)就表現(xiàn)的差一點(diǎn),這一點(diǎn)對(duì)于LongAdderCounter 這種不定時(shí)統(tǒng)計(jì)來說再合適不過:
選用合適的并發(fā)技巧
NioEventLoop采用無鎖串行化的設(shè)計(jì)思路,通過整體并發(fā),局部串行的方式替代多線程消費(fèi)單個(gè)阻塞隊(duì)列的方案,這種方案結(jié)合了netty中多連接少量線程處理的場(chǎng)景,采用mpsc這種多消費(fèi)者單生產(chǎn)者隊(duì)列讓并發(fā)的線程提交移步任務(wù)交給少量的nio線程處理,在整體并行的同時(shí),通過優(yōu)化eventLoop線程邏輯又能讓eventLoop線程能夠高效的串行處理:
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
//提交任務(wù)到mpsc隊(duì)列中
addTask(task);
//如果當(dāng)前提交任務(wù)的不是eventLoop線程,則從eventLoopGroup中啟動(dòng)一個(gè)線程
//......
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
然后NIO線程每次循環(huán)時(shí)調(diào)用runAllTasks有序執(zhí)行:
protected boolean runAllTasks(long timeoutNanos) {
//......
for (;;) {
//執(zhí)行任務(wù)
safeExecute(task);
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
//從mpsc中有序獲取
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
小結(jié)
本文基于源碼的角度分析了netty中的并發(fā)技巧,希望對(duì)你有幫助。