微服務(wù):剖析一下源碼,Nacos的健康檢查竟如此簡單
本文轉(zhuǎn)載自微信公眾號「程序新視界」,作者二師兄。轉(zhuǎn)載本文請聯(lián)系程序新視界公眾號。
前言
前面我們多次提到Nacos的健康檢查,比如《微服務(wù)之:服務(wù)掛的太干脆,Nacos還沒反應(yīng)過來,怎么辦?》一文中還對健康檢查進(jìn)行了自定義調(diào)優(yōu)。那么,Nacos的健康檢查和心跳機(jī)制到底是如何實(shí)現(xiàn)的呢?在項(xiàng)目實(shí)踐中是否又可以參考Nacos的健康檢查機(jī)制,運(yùn)用于其他地方呢?
這篇文章,就帶大家來揭開Nacos健康檢查機(jī)制的面紗。
Nacos的健康檢查
Nacos中臨時實(shí)例基于心跳上報方式維持活性,基本的健康檢查流程基本如下:Nacos客戶端會維護(hù)一個定時任務(wù),每隔5秒發(fā)送一次心跳請求,以確保自己處于活躍狀態(tài)。Nacos服務(wù)端在15秒內(nèi)如果沒收到客戶端的心跳請求,會將該實(shí)例設(shè)置為不健康,在30秒內(nèi)沒收到心跳,會將這個臨時實(shí)例摘除。
原理很簡單,關(guān)于代碼層的實(shí)現(xiàn),下面來就逐步來進(jìn)行解析。
客戶端的心跳
實(shí)例基于心跳上報的形式來維持活性,當(dāng)然就離不開心跳功能的實(shí)現(xiàn)了。這里以客戶端心跳實(shí)現(xiàn)為基準(zhǔn)來進(jìn)行分析。
Spring Cloud提供了一個標(biāo)準(zhǔn)接口ServiceRegistry,Nacos對應(yīng)的實(shí)現(xiàn)類為NacosServiceRegistry。Spring Cloud項(xiàng)目啟動時會實(shí)例化NacosServiceRegistry,并調(diào)用它的register方法來進(jìn)行實(shí)例的注冊。
- @Override
- public void register(Registration registration) {
- // ...
- NamingService namingService = namingService();
- String serviceId = registration.getServiceId();
- String group = nacosDiscoveryProperties.getGroup();
- Instance instance = getNacosInstanceFromRegistration(registration);
- try {
- namingService.registerInstance(serviceId, group, instance);
- log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
- instance.getIp(), instance.getPort());
- }catch (Exception e) {
- // ...
- }
- }
在該方法中有兩處需要注意,第一處是構(gòu)建Instance的getNacosInstanceFromRegistration方法,該方法內(nèi)會設(shè)置Instance的元數(shù)據(jù)(metadata),通過源元數(shù)據(jù)可以配置服務(wù)器端健康檢查的參數(shù)。比如,在Spring Cloud中配置的如下參數(shù),都可以通過元數(shù)據(jù)項(xiàng)在服務(wù)注冊時傳遞給Nacos的服務(wù)端。
- spring:
- application:
- name: user-service-provider
- cloud:
- nacos:
- discovery:
- server-addr: 127.0.0.1:8848
- heart-beat-interval: 5000
- heart-beat-timeout: 15000
- ip-delete-timeout: 30000
其中的heart-beat-interval、heart-beat-timeout、ip-delete-timeout這些健康檢查的參數(shù),都是基于元數(shù)據(jù)上報上去的。
register方法的第二處就是調(diào)用NamingService#registerInstance來進(jìn)行實(shí)例的注冊。NamingService是由Nacos的客戶端提供,也就是說Nacos客戶端的心跳本身是由Nacos生態(tài)提供的。
在registerInstance方法中最終會調(diào)用到下面的方法:
- @Override
- public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
- NamingUtils.checkInstanceIsLegal(instance);
- String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
- if (instance.isEphemeral()) {
- BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
- beatReactor.addBeatInfo(groupedServiceName, beatInfo);
- }
- serverProxy.registerService(groupedServiceName, groupName, instance);
- }
其中BeatInfo#addBeatInfo便是進(jìn)行心跳處理的入口。當(dāng)然,前提條件是當(dāng)前的實(shí)例需要是臨時(瞬時)實(shí)例。
對應(yīng)的方法實(shí)現(xiàn)如下:
- public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
- NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
- String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
- BeatInfo existBeat = null;
- //fix #1733
- if ((existBeat = dom2Beat.remove(key)) != null) {
- existBeat.setStopped(true);
- }
- dom2Beat.put(key, beatInfo);
- executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
- MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
- }
在倒數(shù)第二行可以看到,客戶端是通過定時任務(wù)來處理心跳的,具體的心跳請求由BeatTask完成。定時任務(wù)的執(zhí)行頻次,封裝在BeatInfo,回退往上看,會發(fā)現(xiàn)BeatInfo的Period來源于Instance#getInstanceHeartBeatInterval()。該方法具體實(shí)現(xiàn)如下:
- public long getInstanceHeartBeatInterval() {
- return this.getMetaDataByKeyWithDefault("preserved.heart.beat.interval", Constants.DEFAULT_HEART_BEAT_INTERVAL);
- }
可以看出定時任務(wù)的執(zhí)行間隔就是配置的metadata中的數(shù)據(jù)preserved.heart.beat.interval,與上面提到配置heart-beat-interval本質(zhì)是一回事,默認(rèn)是5秒。
BeatTask類具體實(shí)現(xiàn)如下:
- class BeatTask implements Runnable {
- BeatInfo beatInfo;
- public BeatTask(BeatInfo beatInfo) {
- this.beatInfo = beatInfo;
- }
- @Override
- public void run() {
- if (beatInfo.isStopped()) {
- return;
- }
- long nextTime = beatInfo.getPeriod();
- try {
- JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
- long interval = result.get("clientBeatInterval").asLong();
- boolean lightBeatEnabled = false;
- if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
- lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
- }
- BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
- if (interval > 0) {
- nextTime = interval;
- }
- int code = NamingResponseCode.OK;
- if (result.has(CommonParams.CODE)) {
- code = result.get(CommonParams.CODE).asInt();
- }
- if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
- Instance instance = new Instance();
- instance.setPort(beatInfo.getPort());
- instance.setIp(beatInfo.getIp());
- instance.setWeight(beatInfo.getWeight());
- instance.setMetadata(beatInfo.getMetadata());
- instance.setClusterName(beatInfo.getCluster());
- instance.setServiceName(beatInfo.getServiceName());
- instance.setInstanceId(instance.getInstanceId());
- instance.setEphemeral(true);
- try {
- serverProxy.registerService(beatInfo.getServiceName(),
- NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
- } catch (Exception ignore) {
- }
- }
- } catch (NacosException ex) {
- NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
- JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
- }
- executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
- }
- }
在run方法中通過NamingProxy#sendBeat完成了心跳請求的發(fā)送,而在run方法的最后,再次開啟了一個定時任務(wù),這樣周期性的進(jìn)行心跳請求。
NamingProxy#sendBeat方法實(shí)現(xiàn)如下:
- public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
- if (NAMING_LOGGER.isDebugEnabled()) {
- NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
- }
- Map<String, String> params = new HashMap<String, String>(8);
- Map<String, String> bodyMap = new HashMap<String, String>(2);
- if (!lightBeatEnabled) {
- bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
- }
- params.put(CommonParams.NAMESPACE_ID, namespaceId);
- params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
- params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
- params.put("ip", beatInfo.getIp());
- params.put("port", String.valueOf(beatInfo.getPort()));
- String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
- return JacksonUtils.toObj(result);
- }
實(shí)際上,就是調(diào)用了Nacos服務(wù)端提供的"/nacos/v1/ns/instance/beat"服務(wù)。
在客戶端的常量類Constants中定義了心跳相關(guān)的默認(rèn)參數(shù):
- static {
- DEFAULT_HEART_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15L);
- DEFAULT_IP_DELETE_TIMEOUT = TimeUnit.SECONDS.toMillis(30L);
- DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5L);
- }
這樣就呼應(yīng)了最開始說的Nacos健康檢查機(jī)制的幾個時間維度。
服務(wù)端接收心跳
分析客戶端的過程中已經(jīng)可以看出請求的是/nacos/v1/ns/instance/beat這個服務(wù)。Nacos服務(wù)端是在Naming項(xiàng)目中的InstanceController中實(shí)現(xiàn)的。
- @CanDistro
- @PutMapping("/beat")
- @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
- public ObjectNode beat(HttpServletRequest request) throws Exception {
- // ...
- Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
- if (instance == null) {
- // ...
- instance = new Instance();
- instance.setPort(clientBeat.getPort());
- instance.setIp(clientBeat.getIp());
- instance.setWeight(clientBeat.getWeight());
- instance.setMetadata(clientBeat.getMetadata());
- instance.setClusterName(clusterName);
- instance.setServiceName(serviceName);
- instance.setInstanceId(instance.getInstanceId());
- instance.setEphemeral(clientBeat.isEphemeral());
- serviceManager.registerInstance(namespaceId, serviceName, instance);
- }
- Service service = serviceManager.getService(namespaceId, serviceName);
- // ...
- service.processClientBeat(clientBeat);
- // ...
- return result;
- }
服務(wù)端在接收到請求時,主要做了兩件事:第一,如果發(fā)送心跳的實(shí)例不存在,則將其進(jìn)行注冊;第二,調(diào)用其Service的processClientBeat方法進(jìn)行心跳處理。
processClientBeat方法實(shí)現(xiàn)如下:
- public void processClientBeat(final RsInfo rsInfo) {
- ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
- clientBeatProcessor.setService(this);
- clientBeatProcessor.setRsInfo(rsInfo);
- HealthCheckReactor.scheduleNow(clientBeatProcessor);
- }
再來看看ClientBeatProcessor中對具體任務(wù)的實(shí)現(xiàn):
- @Override
- public void run() {
- Service service = this.service;
- // logging
- String ip = rsInfo.getIp();
- String clusterName = rsInfo.getCluster();
- int port = rsInfo.getPort();
- Cluster cluster = service.getClusterMap().get(clusterName);
- List<Instance> instances = cluster.allIPs(true);
- for (Instance instance : instances) {
- if (instance.getIp().equals(ip) && instance.getPort() == port) {
- // logging
- instance.setLastBeat(System.currentTimeMillis());
- if (!instance.isMarked()) {
- if (!instance.isHealthy()) {
- instance.setHealthy(true);
- // logging
- getPushService().serviceChanged(service);
- }
- }
- }
- }
- }
在run方法中先檢查了發(fā)送心跳的實(shí)例和IP是否一致,如果一致則更新最后一次心跳時間。同時,如果該實(shí)例之前未被標(biāo)記且處于不健康狀態(tài),則將其改為健康狀態(tài),并將變動通過PushService提供事件機(jī)制進(jìn)行發(fā)布。事件是由Spring的ApplicationContext進(jìn)行發(fā)布,事件為ServiceChangeEvent。
通過上述心跳操作,Nacos服務(wù)端的實(shí)例的健康狀態(tài)和最后心跳時間已經(jīng)被刷新。那么,如果沒有收到心跳時,服務(wù)器端又是如何判斷呢?
服務(wù)端心跳檢查
客戶端發(fā)起心跳,服務(wù)器端來檢查客戶端的心跳是否正常,或者說對應(yīng)的實(shí)例中的心跳更新時間是否正常。
服務(wù)器端心跳的觸發(fā)是在服務(wù)實(shí)例注冊時觸發(fā)的,同樣在InstanceController中,register注冊實(shí)現(xiàn)如下:
- @CanDistro
- @PostMapping
- @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
- public String register(HttpServletRequest request) throws Exception {
- // ...
- final Instance instance = parseInstance(request);
- serviceManager.registerInstance(namespaceId, serviceName, instance);
- return "ok";
- }
ServiceManager#registerInstance實(shí)現(xiàn)代碼如下:
- public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
- createEmptyService(namespaceId, serviceName, instance.isEphemeral());
- // ...
- }
心跳相關(guān)實(shí)現(xiàn)在第一次創(chuàng)建空的Service中實(shí)現(xiàn),最終會調(diào)到如下方法:
- public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
- throws NacosException {
- Service service = getService(namespaceId, serviceName);
- if (service == null) {
- Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
- service = new Service();
- service.setName(serviceName);
- service.setNamespaceId(namespaceId);
- service.setGroupName(NamingUtils.getGroupName(serviceName));
- // now validate the service. if failed, exception will be thrown
- service.setLastModifiedMillis(System.currentTimeMillis());
- service.recalculateChecksum();
- if (cluster != null) {
- cluster.setService(service);
- service.getClusterMap().put(cluster.getName(), cluster);
- }
- service.validate();
- putServiceAndInit(service);
- if (!local) {
- addOrReplaceService(service);
- }
- }
- }
在putServiceAndInit方法中對Service進(jìn)行初始化:
- private void putServiceAndInit(Service service) throws NacosException {
- putService(service);
- service = getService(service.getNamespaceId(), service.getName());
- service.init();
- consistencyService
- .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
- consistencyService
- .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
- Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
- }
service.init()方法實(shí)現(xiàn):
- public void init() {
- HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
- for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
- entry.getValue().setService(this);
- entry.getValue().init();
- }
- }
HealthCheckReactor#scheduleCheck方法實(shí)現(xiàn):
- public static void scheduleCheck(ClientBeatCheckTask task) {
- futureMap.computeIfAbsent(task.taskKey(),
- k -> GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
- }
延遲5秒執(zhí)行,每5秒檢查一次。
在init方法的第一行便可以看到執(zhí)行健康檢查的Task,具體Task是由ClientBeatCheckTask來實(shí)現(xiàn),對應(yīng)的run方法核心代碼如下:
- @Override
- public void run() {
- // ...
- List<Instance> instances = service.allIPs(true);
- // first set health status of instances:
- for (Instance instance : instances) {
- if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
- if (!instance.isMarked()) {
- if (instance.isHealthy()) {
- instance.setHealthy(false);
- // logging...
- getPushService().serviceChanged(service);
- ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
- }
- }
- }
- }
- if (!getGlobalConfig().isExpireInstance()) {
- return;
- }
- // then remove obsolete instances:
- for (Instance instance : instances) {
- if (instance.isMarked()) {
- continue;
- }
- if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
- // delete instance
- deleteIp(instance);
- }
- }
- }
在第一個for循環(huán)中,先判斷當(dāng)前時間與上次心跳時間的間隔是否大于超時時間。如果實(shí)例已經(jīng)超時,且為被標(biāo)記,且健康狀態(tài)為健康,則將健康狀態(tài)設(shè)置為不健康,同時發(fā)布狀態(tài)變化的事件。
在第二個for循環(huán)中,如果實(shí)例已經(jīng)被標(biāo)記則跳出循環(huán)。如果未標(biāo)記,同時當(dāng)前時間與上次心跳時間的間隔大于刪除IP時間,則將對應(yīng)的實(shí)例刪除。
小結(jié)
通過本文的源碼分析,我們從Spring Cloud開始,追蹤到Nacos Client中的心跳時間,再追蹤到Nacos服務(wù)端接收心跳的實(shí)現(xiàn)和檢查實(shí)例是否健康的實(shí)現(xiàn)。想必通過整個源碼的梳理,你已經(jīng)對整個Nacos心跳的實(shí)現(xiàn)有所了解。關(guān)注我,持續(xù)更新Nacos的最新干貨。