Nacos 的長輪詢定時機制,太好用了!
今天這篇文章來介紹一下Nacos配置中心的原理之一:長輪詢機制的應用
為方便理解與表達,這里把 Nacos 控制臺和 Nacos 注冊中心稱為 Nacos 服務器(就是 web 界面那個),我們編寫的業務服務稱為 Nacso 客戶端;
Nacos 動態監聽的長輪詢機制原理圖,本篇將圍繞這張圖剖析長輪詢定時機制的原理:
圖片
ConfigService 是 Nacos 客戶端提供的用于訪問實現配置中心基本操作的類,我們將從 ConfigService 的實例化開始長輪詢定時機制的源碼之旅;
1. 客戶端的長輪詢定時機制
我們從NacosPropertySourceLocator.locate()
開始【斷點步入】:
圖片
1.1 利用反射機制實例化 NacosConfigService 對象
客戶端的長輪詢定時任務是在 NacosFactory.createConfigService()
方法中,構建 ConfigService 對象實例時啟動的,我們接著 1.1 處的源碼;
進入 NacosFactory.createConfigService()
:
public static ConfigService createConfigService(Properties properties) throws NacosException {
//【斷點步入】創建 ConfigService
return ConfigFactory.createConfigService(properties);
}
進入 ConfigFactory.createConfigService()
,發現其使用反射機制實例化 NacosConfigService 對象;
圖片
1.2 NacosConfigService 的構造方法里啟動長輪詢定時任務
進入 NacosConfigService.NacosConfigService()
構造方法,里面設置了一些更遠程任務相關的屬性;
圖片
1.2.1 初始化 HttpAgent
MetricsHttpAgent 類的設計如下:
圖片
ServerHttpAgent 類的設計如下:
圖片
1.2.2 初始化 ClientWorker
進入 ClientWorker.ClientWorker()
構造方法,主要是創建了兩個定時調度的線程池,并啟動一個定時任務;
圖片
進入 ClientWorker.checkConfigInfo()
,每隔 10s 檢查一次配置是否發生變化;
- cacheMap:是一個 AtomicReference<Map<String, CacheData>> 對象,用來存儲監聽變更的緩存集合,key 是根據 datalD/group/tenant(租戶)拼接的值。Value 是對應的存儲在 Nacos 服務器上的配置文件的內容;
- 長輪詢任務拆分:默認情況下,每個長輪詢 LongPollingRunnable 任務處理3000個監聽配置集。如果超過3000個,則需要啟動多個 LongPollingRunnable 去執行;
圖片
1.3 檢查配置變更,讀取變更配置 LongPollingRunnable.run()
因為我們沒有這么多配置項,debug 不進去,所以直接找到 LongPollingRunnable.run()
方法,該方法的主要邏輯是:
- 根據 taskld 對 cacheMap 進行數據分割;
- 再通過
checkLocalConfig()
方法比較本地配置文件(在${user}\nacos\config\
里)的數據是否存在變更,如果有變更則直接觸發通知;
public void run() {
List<CacheData> cacheDatas = new ArrayList();
ArrayList inInitializingCacheList = new ArrayList();
try {
//遍歷 CacheData,檢查本地配置
Iterator var3 = ((Map)ClientWorker.this.cacheMap.get()).values().iterator();
while(var3.hasNext()) {
CacheData cacheData = (CacheData)var3.next();
if (cacheData.getTaskId() == this.taskId) {
cacheDatas.add(cacheData);
try {
//檢查本地配置
ClientWorker.this.checkLocalConfig(cacheData);
if (cacheData.isUseLocalConfigInfo()) {
cacheData.checkListenerMd5();
}
} catch (Exception var13) {
ClientWorker.LOGGER.error("get local config info error", var13);
}
}
}
//【斷點步入 1.3.1】通過長輪詢請求檢查服務端對應的配置是否發生變更
List<String> changedGroupKeys = ClientWorker.this.checkUpdateDataIds(cacheDatas, inInitializingCacheList);
//遍歷存在變更的 groupKey,重新加載最新數據
Iterator var16 = changedGroupKeys.iterator();
while(var16.hasNext()) {
String groupKey = (String)var16.next();
String[] key = GroupKey.parseKey(groupKey);
String dataId = key[0];
String group = key[1];
String tenant = null;
if (key.length == 3) {
tenant = key[2];
}
try {
//【斷點步入 1.3.2】讀取變更配置,這里的 dataId、group 和 tenant 是【1.3.1】里獲取的
String content = ClientWorker.this.getServerConfig(dataId, group, tenant, 3000L);
CacheData cache = (CacheData)((Map)ClientWorker.this.cacheMap.get()).get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(content);
ClientWorker.LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, cnotallow={}", new Object[]{ClientWorker.this.agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(content)});
} catch (NacosException var12) {
String message = String.format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", ClientWorker.this.agent.getName(), dataId, group, tenant);
ClientWorker.LOGGER.error(message, var12);
}
}
//觸發事件通知
var16 = cacheDatas.iterator();
while(true) {
CacheData cacheDatax;
do {
if (!var16.hasNext()) {
inInitializingCacheList.clear();
//繼續定時執行當前線程
ClientWorker.this.executorService.execute(this);
return;
}
cacheDatax = (CacheData)var16.next();
} while(cacheDatax.isInitializing() && !inInitializingCacheList.contains(GroupKey.getKeyTenant(cacheDatax.dataId, cacheDatax.group, cacheDatax.tenant)));
cacheDatax.checkListenerMd5();
cacheDatax.setInitializing(false);
}
} catch (Throwable var14) {
ClientWorker.LOGGER.error("longPolling error : ", var14);
ClientWorker.this.executorService.schedule(this, (long)ClientWorker.this.taskPenaltyTime, TimeUnit.MILLISECONDS);
}
}
注意:這里的斷點需要在 Nacos 服務器上修改配置(間隔大于 30s),進入后才好理解;
1.3.1 檢查配置變更 ClientWorker.checkUpdateDataIds()
我們點進 ClientWorker.checkUpdateDataIds()
方法,發現其最終調用的是 ClientWorker.checkUpdateConfigStr()
方法,其實現邏輯與源碼如下:
- 通過
MetricsHttpAgent.httpPost()
方法(上面 1.2.1 有提到)調用/v1/cs/configs/listener
接口實現長輪詢請求; - 長輪詢請求在實現層面只是設置了一個比較長的超時時間,默認是 30s;
- 如果服務端的數據發生了變更,客戶端會收到一個 HttpResult ,服務端返回的是存在數據變更的 Data ID、Group、Tenant;
- 獲得這些信息之后,在
LongPollingRunnable.run()
方法中調用 getServerConfig() 去 Nacos 服務器上讀取具體的配置內容;
List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException {
List<String> params = Arrays.asList("Listening-Configs", probeUpdateString);
List<String> headers = new ArrayList(2);
headers.add("Long-Pulling-Timeout");
headers.add("" + this.timeout);
if (isInitializingCacheList) {
headers.add("Long-Pulling-Timeout-No-Hangup");
headers.add("true");
}
if (StringUtils.isBlank(probeUpdateString)) {
return Collections.emptyList();
} else {
try {
//調用 /v1/cs/configs/listener 接口實現長輪詢請求,返回的 HttpResult 里包含存在數據變更的 Data ID、Group、Tenant
HttpResult result = this.agent.httpPost("/v1/cs/configs/listener", headers, params, this.agent.getEncode(), this.timeout);
if (200 == result.code) {
this.setHealthServer(true);
//
returnthis.parseUpdateDataIdResponse(result.content);
}
this.setHealthServer(false);
LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", this.agent.getName(), result.code);
} catch (IOException var6) {
this.setHealthServer(false);
LOGGER.error("[" + this.agent.getName() + "] [check-update] get changed dataId exception", var6);
throw var6;
}
return Collections.emptyList();
}
}
1.3.2 讀取變更配置 ClientWorker.getServerConfig()
進入 ClientWorker.getServerConfig()
方法;讀取服務器上的變更配置;最終調用的是 MetricsHttpAgent.httpGet()
方法(上面 1.2.1 有提到),調用 /v1/cs/configs
接口獲取配置;然后通過調用 LocalConfigInfoProcessor.saveSnapshot()
將變更的配置保存到本地;
圖片
圖片
2. 服務端的長輪詢定時機制
2.1 服務器接收請求 ConfigController.listener()
Nacos客戶端 通過 HTTP 協議與服務器通信,那么在服務器源碼里必然有對應接口的實現;
在 nacos-config 模塊下的 controller 包,提供了個 ConfigController 類來處理請求,其中有個 /listener
接口,是客戶端發起數據監聽的接口,其主要邏輯和源碼如下:
- 獲取客戶端需要監聽的可能發生變化的配置,并計算 MD5 值;
ConfigServletInner.doPollingConfig()
開始執行長輪詢請求;
2.2 執行長輪詢請求 ConfigSer
圖片
vletInner.doPollingConfig()
進入 ConfigServletInner.doPollingConfig()
方法,該方法封裝了長輪詢的實現邏輯,同時兼容短輪詢邏輯;
圖片
進入 LongPollingService.addLongPollingClient()
方法,里面是長輪詢的核心處理邏輯,主要作用是把客戶端的長輪詢請求封裝成 ClientPolling 交給 scheduler 執行;
圖片
2.3 創建線程執行定時任務 ClientLongPolling.run()
我們找到 ClientLongPolling.run()
方法,這里可以體現長輪詢定時機制的核心原理,通俗來說,就是:
- 服務端收到請求之后,不立即返回,沒有變更則在延后 (30-0.5)s 把請求結果返回給客戶端;
- 這就使得客戶端和服務端之間在 30s 之內數據沒有發生變化的情況下一直處于連接狀態;
圖片
2.4 監聽配置變更事件
2.4.1 監聽 LocalDataChangeEvent 事件的實現
當我們在 Nacos 服務器或通過 API 方式變更配置后,會發布一個 LocalDataChangeEvent 事件,該事件會被 LongPollingService 監聽;
這里 LongPollingService 為什么具有監聽功能在 1.3.1 版本后有些變化:
- 1.3.1 前:
LongPollingService.onEvent()
; - 1.3.1 后:
Subscriber.onEvent()
;
在 Nacos 1.3.1 版本之前,通過 LongPollingService 繼承 AbstractEventListener 實現監聽,覆蓋 onEvent() 方法;
圖片
而在 1.3.2 版本之后,通過構造訂閱者實現
圖片
效果是一樣的,實現了對 LocalDataChangeEvent 事件的監聽,并通過通過線程池執行 DataChangeTask 任務;
2.4.2 監聽事件后的處理邏輯 DataChangeTask.run()
我們找到 DataChangeTask.run()
方法,這個線程任務實現了
圖片
3. 源碼結構圖小結
3.1 客戶端的長輪詢定時機制
NacosPropertySourceLocator.locate() :初始化 ConfigService 對象,定位配置;
- NacosConfigService.NacosConfigService() :NacosConfigService 的構造方法;
- Executors.newScheduledThreadPool() :創建 executor 線程池;
- Executors.newScheduledThreadPool() :創建 executorService 線程池;
- ClientWorker.checkConfigInfo() :使用 executor 線程池檢查配置是否發生變化;
- ClientWorker.checkLocalConfig() :檢查本地配置;
- ClientWorker.checkUpdateDataIds() :檢查服務端對應的配置是否發生變更;
- ClientWorker.getServerConfig() :讀取變更配置
- MetricsHttpAgent.httpPost() :調用 /v1/cs/configs/listener 接口實現長輪詢請求;
- ClientWorker.checkUpdateConfigStr() :檢查服務端對應的配置是否發生變更;
- MetricsHttpAgent.httpGet() :調用 /v1/cs/configs 接口獲取配置;
- LongPollingRunnable.run() :運行長輪詢定時線程;
- MetricsHttpAgent.MetricsHttpAgent() :初始化 HttpAgent;
- ClientWorker.ClientWorker() :初始化 ClientWorker;
- NacosFactory.createConfigService() :創建配置服務器;
- ConfigFactory.createConfigService() :利用反射機制創建配置服務器;
3.2 服務端的長輪詢定時機制
ConfigController.listener() :服務器接收請求;
- LongPollingService.addLongPollingClient() :長輪詢的核心處理邏輯,提前 500ms 返回響應;
- ClientLongPolling.run() :長輪詢定時機制的實現邏輯;
- Map.put() :將 ClientLongPolling 實例本身添加到 allSubs 隊列中;
- Queue.remove() :把 ClientLongPolling 實例本身從 allSubs 隊列中移除;
- MD5Util.compareMd5() :比較數據的 MD5 值;
- LongPollingService.sendResponse() :將變更的結果通過 response 返回給客戶端;
- ConfigExecutor.scheduleLongPolling() :啟動定時任務,延時時間為 29.5s;
- HttpServletRequest.getHeader() :獲取客戶端設置的請求超時時間;
- MD5Util.compareMd5() :和服務端的數據進行 MD5 對比;
- ConfigExecutor.executeLongPolling() :創建 ClientLongPolling 線程執行定時任務;
- MD5Util.getClientMd5Map() :計算 MD5 值;
- ConfigServletInner.doPollingConfig() :執行長輪詢請求;
3.3 Nacos 服務器配置變更的事件監聽
Nacos 服務器上的配置發生變更后,發布一個 LocalDataChangeEvent
事件;
Subscriber.onEvent() :監聽 LocalDataChangeEvent
事件(1.3.2 版本后);
DataChangeTask.run() :根據 groupKey 返回配置;
ConfigExecutor.executeLongPolling() :通過線程池執行 DataChangeTask 任務。