成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

四個維度搞懂 Nacos 注冊中心

開發(fā) 項目管理
現(xiàn)如今市面上注冊中心的輪子很多,我實際使用過的就有三款:Eureka、Gsched、Nacos,由于當(dāng)前參與 Nacos 集群的維護和開發(fā)工作,期間也參與了 Nacos 社區(qū)的一些開發(fā)和 Bug Fix 工作,過程中對 Nacos 原理有了一定的積累,今天給大家分享一下 Nacos 動態(tài)服務(wù)發(fā)現(xiàn)的原理。

大家好呀,我是樓仔。

現(xiàn)如今市面上注冊中心的輪子很多,我實際使用過的就有三款:Eureka、Gsched、Nacos,由于當(dāng)前參與 Nacos 集群的維護和開發(fā)工作,期間也參與了 Nacos 社區(qū)的一些開發(fā)和 Bug Fix 工作,過程中對 Nacos 原理有了一定的積累,今天給大家分享一下 Nacos 動態(tài)服務(wù)發(fā)現(xiàn)的原理。

不 BB,上文章目錄:

圖片

1、什么是動態(tài)服務(wù)發(fā)現(xiàn)?

服務(wù)發(fā)現(xiàn)是指使用一個注冊中心來記錄分布式系統(tǒng)中的全部服務(wù)的信息,以便其他服務(wù)能夠快速的找到這些已注冊的服務(wù)。

在單體應(yīng)用中,DNS+Nginx 可以滿足服務(wù)發(fā)現(xiàn)的要求,此時服務(wù)的IP列表配置在 nginx 上。在微服務(wù)架構(gòu)中,由于服務(wù)粒度變的更細,服務(wù)的上下線更加頻繁,我們需要一款注冊中心來動態(tài)感知服務(wù)的上下線,并且推送IP列表變化給服務(wù)消費者,架構(gòu)如下圖。

圖片

2、Nacos 實現(xiàn)動態(tài)服務(wù)發(fā)現(xiàn)的原理

Nacos實現(xiàn)動態(tài)服務(wù)發(fā)現(xiàn)的核心原理如下圖,我們接下來的內(nèi)容將圍繞這個圖來進行。

圖片

2.1 通訊協(xié)議

整個服務(wù)注冊與發(fā)現(xiàn)過程,都離不開通訊協(xié)議,在1.x的 Nacos 版本中服務(wù)端只支持 http 協(xié)議,后來為了提升性能在2.x版本引入了谷歌的 grpc,grpc 是一款長連接協(xié)議,極大的減少了 http 請求頻繁的連接創(chuàng)建和銷毀過程,能大幅度提升性能,節(jié)約資源。

據(jù)官方測試,Nacos服務(wù)端 grpc 版本,相比 http 版本的性能提升了9倍以上。

2.2 Nacos 服務(wù)注冊

簡單來講,服務(wù)注冊的目的就是客戶端將自己的ip端口等信息上報給 Nacos 服務(wù)端,過程如下:

  • 創(chuàng)建長連接:Nacos SDK 通過Nacos服務(wù)端域名解析出服務(wù)端ip列表,選擇其中一個ip創(chuàng)建 grpc 連接,并定時檢查連接狀態(tài),當(dāng)連接斷開,則自動選擇服務(wù)端ip列表中的下一個ip進行重連。
  • 健康檢查請求:在正式發(fā)起注冊之前,Nacos SDK 向服務(wù)端發(fā)送一個空請求,服務(wù)端回應(yīng)一個空請求,若Nacos SDK 未收到服務(wù)端回應(yīng),則認為服務(wù)端不健康,并進行一定次數(shù)重試,如果都未收到回應(yīng),則注冊失敗。
  • 發(fā)起注冊:當(dāng)你查看Nacos java SDK的注冊方法時,你會發(fā)現(xiàn)沒有返回值,這是因為Nacos SDK做了補償機制,在真實給服務(wù)端上報數(shù)據(jù)之前,會先往緩存中插入一條記錄表示開始注冊,注冊成功之后再從緩存中標(biāo)記這條記錄為注冊成功,當(dāng)注冊失敗時,緩存中這條記錄是未注冊成功的狀態(tài),Nacos SDK開啟了一個定時任務(wù),定時查詢異常的緩存數(shù)據(jù),重新發(fā)起注冊。

Nacos SDK注冊失敗時的自動補償機制時序圖。

圖片

相關(guān)源碼如下:

@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName,
instance);
//添加redo日志
redoService.cacheInstanceForRedo(serviceName, groupName, instance);

doRegisterService(serviceName, groupName, instance);
}
public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
//向服務(wù)端發(fā)起注冊
InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
NamingRemoteConstants.REGISTER_INSTANCE, instance);
requestToServer(request, Response.class);
//標(biāo)記注冊成功
redoService.instanceRegistered(serviceName, groupName);
}

執(zhí)行補償定時任務(wù)RedoScheduledTask。

@Override
public void run() {
if (!redoService.isConnected()) {
LogUtils.NAMING_LOGGER.warn("Grpc Connection is disconnect, skip current redo task");
return;
}
try {
redoForInstances();
redoForSubscribes();
} catch (Exception e) {
LogUtils.NAMING_LOGGER.warn("Redo task run with unexpected exception: ", e);
}
}
private void redoForInstances() {
for (InstanceRedoData each : redoService.findInstanceRedoData()) {
try {
redoForInstance(each);
} catch (NacosException e) {
LogUtils.NAMING_LOGGER.error("Redo instance operation {} for {}@@{} failed. ", each.getRedoType(),
each.getGroupName(), each.getServiceName(), e);
}
}
}
  • 服務(wù)端數(shù)據(jù)同步(Distro協(xié)議):Nacos SDK只會與服務(wù)端某個節(jié)點建立長連接,當(dāng)服務(wù)端接受到客戶端注冊的實例數(shù)據(jù)后,還需要將實例數(shù)據(jù)同步給其他節(jié)點。Nacos自己實現(xiàn)了一個一致性協(xié)議名為Distro,服務(wù)注冊的時候會觸發(fā)Distro一次同步,每個Nacos節(jié)點之間會定時互相發(fā)送Distro數(shù)據(jù),以此保證數(shù)據(jù)最終一致。
  • 服務(wù)實例上線推送:Nacos服務(wù)端收到服務(wù)實例數(shù)據(jù)后會將服務(wù)的最新實例列表通過grpc推送給該服務(wù)的所有訂閱者。
  • 服務(wù)注冊過程源碼時序圖:整理了一下服務(wù)注冊過程整體時序圖,對源碼實現(xiàn)感興趣的可以按照根據(jù)這個時序圖view一下源碼。

圖片

2.3 Nacos 心跳機制

目前主流的注冊中心,比如Consul、Eureka、Zk包括我們公司自研的Gsched,都是通過心跳機制來感知服務(wù)的下線。Nacos也是通過心跳機制來實現(xiàn)的。

Nacos目前SDK維護了兩個分支的版本(1.x、2.x),這兩個版本心跳機制的實現(xiàn)不一樣。其中1.x版本的SDK通過http協(xié)議來定時向服務(wù)端發(fā)送心跳維持自己的健康狀態(tài),2.x版本的SDK則通過grpc自身的心跳機制來保活,當(dāng)Nacos服務(wù)端接受不到服務(wù)實例的心跳,會認為實例下線。如下圖:

圖片

grpc監(jiān)測到連接斷開事件,發(fā)送ClientDisconnectEvent。

public class ConnectionBasedClientManager extends ClientConnectionEventListener implements ClientManager {
//連接斷開,發(fā)送連接斷開事件
public boolean clientDisconnected(String clientId) {
Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);
ConnectionBasedClient client = clients.remove(clientId);
if (null == client) {
return true;
}
client.release();
NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
return true;
}
}

移除客戶端注冊的服務(wù)實例

public class ClientServiceIndexesManager extends SmartSubscriber {

@Override
public void onEvent(Event event) {
//接收失去連接事件
if (event instanceof ClientEvent.ClientDisconnectEvent) {
handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);
} else if (event instanceof ClientOperationEvent) {
handleClientOperation((ClientOperationEvent) event);
}
}
private void handleClientDisconnect(ClientEvent.ClientDisconnectEvent event) {
Client client = event.getClient();
for (Service each : client.getAllSubscribeService()) {
removeSubscriberIndexes(each, client.getClientId());
}
//移除客戶端注冊的服務(wù)實例
for (Service each : client.getAllPublishedService()) {
removePublisherIndexes(each, client.getClientId());
}
}

//移除客戶端注冊的服務(wù)實例
private void removePublisherIndexes(Service service, String clientId) {
if (!publisherIndexes.containsKey(service)) {
return;
}
publisherIndexes.get(service).remove(clientId);
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
}
}

2.4 Nacos 服務(wù)訂閱

當(dāng)一個服務(wù)發(fā)生上下線,Nacos如何知道要推送給哪些客戶端?

Nacos SDK 提供了訂閱和取消訂閱方法,當(dāng)客戶端向服務(wù)端發(fā)起訂閱請求,服務(wù)端會記錄發(fā)起調(diào)用的客戶端為該服務(wù)的訂閱者,同時將服務(wù)的最新實例列表返回。當(dāng)客戶端發(fā)起了取消訂閱,服務(wù)端就會從該服務(wù)的訂閱者列表中把當(dāng)前客戶端移除。

當(dāng)客戶端發(fā)起訂閱時,服務(wù)端除了會同步返回最新的服務(wù)實例列表,還會異步的通過grpc推送給該訂閱者最新的服務(wù)實例列表,這樣做的目的是為了異步更新客戶端本地緩存的服務(wù)數(shù)據(jù)。

當(dāng)客戶端訂閱的服務(wù)上下線,該服務(wù)所有的訂閱者會立刻收到最新的服務(wù)列表并且將服務(wù)最新的實例數(shù)據(jù)更新到內(nèi)存。

圖片

我們也看一下相關(guān)源碼,服務(wù)端接收到訂閱數(shù)據(jù),首先保存到內(nèi)存中。

@Override
public void subscribeService(Service service, Subscriber subscriber, String clientId) {
Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);
Client client = clientManager.getClient(clientId);
//校驗長連接是否正常
if (!clientIsLegal(client, clientId)) {
return;
}
//保存訂閱數(shù)據(jù)
client.addServiceSubscriber(singleton, subscriber);
client.setLastUpdatedTime();
//發(fā)送訂閱事件
NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));
}

private void handleClientOperation(ClientOperationEvent event) {
Service service = event.getService();
String clientId = event.getClientId();
if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
addPublisherIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
removePublisherIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
//處理訂閱操作
addSubscriberIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
removeSubscriberIndexes(service, clientId);
}
}

然后發(fā)布訂閱事件。

private void addSubscriberIndexes(Service service, String clientId) {
//保存訂閱數(shù)據(jù)
subscriberIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
// Fix #5404, Only first time add need notify event.
if (subscriberIndexes.get(service).add(clientId)) {
//發(fā)布訂閱事件
NotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service, clientId));
}
}

服務(wù)端自己消費訂閱事件,并且推送給訂閱的客戶端最新的服務(wù)實例數(shù)據(jù)。

@Override
public void onEvent(Event event) {
if (!upgradeJudgement.isUseGrpcFeatures()) {
return;
}
if (event instanceof ServiceEvent.ServiceChangedEvent) {
// If service changed, push to all subscribers.
ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
Service service = serviceChangedEvent.getService();
delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
} else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
// If service is subscribed by one client, only push this client.
ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;
Service service = subscribedEvent.getService();
delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(),
subscribedEvent.getClientId()));
}
}

2.5 Nacos 推送

推送方式

前面說了服務(wù)的注冊和訂閱都會發(fā)生推送(服務(wù)端->客戶端),那推送到底是如何實現(xiàn)的呢?

在早期的Nacos版本,當(dāng)服務(wù)實例變化,服務(wù)端會通過udp協(xié)議將最新的數(shù)據(jù)發(fā)送給客戶端,后來發(fā)現(xiàn)udp推送有一定的丟包率,于是新版本的Nacos支持了grpc推送。Nacos服務(wù)端會自動判斷客戶端的版本來選擇哪種方式來進行推送,如果你使用1.4.2以前的SDK進行注冊,那Nacos服務(wù)端會使用udp協(xié)議來進行推送,反之則使用grpc。

推送失敗重試

當(dāng)發(fā)送推送時,客戶端可能正在重啟,或者連接不穩(wěn)定導(dǎo)致推送失敗,這個時候Nacos會進行重試。Nacos將每個推送都封裝成一個任務(wù)對象,放入到隊列中,再開啟一個線程不停的從隊列取出任務(wù)執(zhí)行,執(zhí)行之前會先刪除該任務(wù),如果執(zhí)行失敗則將任務(wù)重新添加到隊列,該線程會記錄任務(wù)執(zhí)行的時間,如果超過1秒,則會記錄到日志。

推送部分源碼

添加推送任務(wù)到執(zhí)行隊列中。

private static class PushDelayTaskProcessor implements NacosTaskProcessor {

private final PushDelayTaskExecuteEngine executeEngine;

public PushDelayTaskProcessor(PushDelayTaskExecuteEngine executeEngine) {
this.executeEngine = executeEngine;
}

@Override
public boolean process(NacosTask task) {
PushDelayTask pushDelayTask = (PushDelayTask) task;
Service service = pushDelayTask.getService();
NamingExecuteTaskDispatcher.getInstance()
.dispatchAndExecuteTask(service, new PushExecuteTask(service, executeEngine, pushDelayTask));
return true;
}
}

推送任務(wù)PushExecuteTask 的執(zhí)行。

public class PushExecuteTask extends AbstractExecuteTask {

//..省略

@Override
public void run() {
try {
//封裝要推送的服務(wù)實例數(shù)據(jù)
PushDataWrapper wrapper = generatePushData();
ClientManager clientManager = delayTaskEngine.getClientManager();
//如果是服務(wù)上下線導(dǎo)致的推送,獲取所有訂閱者
//如果是訂閱導(dǎo)致的推送,獲取訂閱者
for (String each : getTargetClientIds()) {
Client client = clientManager.getClient(each);
if (null == client) {
// means this client has disconnect
continue;
}
Subscriber subscriber = clientManager.getClient(each).getSubscriber(service);
//推送給訂閱者
delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper,
new NamingPushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll()));
}
} catch (Exception e) {
Loggers.PUSH.error("Push task for service" + service.getGroupedServiceName() + " execute failed ", e);
//當(dāng)推送發(fā)生異常,重新將推送任務(wù)放入執(zhí)行隊列
delayTaskEngine.addTask(service, new PushDelayTask(service, 1000L));
}
}

//如果是服務(wù)上下線導(dǎo)致的推送,獲取所有訂閱者
//如果是訂閱導(dǎo)致的推送,獲取訂閱者
private Collection<String> getTargetClientIds() {
return delayTask.isPushToAll() ? delayTaskEngine.getIndexesManager().getAllClientsSubscribeService(service)
: delayTask.getTargetClients();
}

執(zhí)行推送任務(wù)線程InnerWorker 的執(zhí)行。

/**
* Inner execute worker.
*/
private class InnerWorker extends Thread {

InnerWorker(String name) {
setDaemon(false);
setName(name);
}

@Override
public void run() {
while (!closed.get()) {
try {
//從隊列中取出任務(wù)PushExecuteTask
Runnable task = queue.take();
long begin = System.currentTimeMillis();
//執(zhí)行PushExecuteTask
task.run();
long duration = System.currentTimeMillis() - begin;
if (duration > 1000L) {
log.warn("task {} takes {}ms", task, duration);
}
} catch (Throwable e) {
log.error("[TASK-FAILED] " + e.toString(), e);
}
}
}
}

2.6 Nacos SDK 查詢服務(wù)實例

服務(wù)消費者首先需要調(diào)用Nacos SDK的接口來獲取最新的服務(wù)實例,然后才能從獲取到的實例列表中以加權(quán)輪詢的方式選擇出一個實例(包含ip,port等信息),最后再發(fā)起調(diào)用。

前面已經(jīng)提到Nacos服務(wù)發(fā)生上下線、訂閱的時候都會推送最新的服務(wù)實例列表到當(dāng)客戶端,客戶端再更新本地內(nèi)存中的緩沖數(shù)據(jù),所以調(diào)用Nacos SDK提供的查詢實例列表的接口時,不會直接請求服務(wù)端獲取數(shù)據(jù),而是會優(yōu)先使用內(nèi)存中的服務(wù)數(shù)據(jù),只有內(nèi)存中查不到的情況下才會發(fā)起訂閱請求服務(wù)端數(shù)據(jù)。

Nacos SDK內(nèi)存中的數(shù)據(jù)除了接受來自服務(wù)端的推送更新之外,自己本地也會有一個定時任務(wù)定時去獲取服務(wù)端數(shù)據(jù)來進行兜底。Nacos SDK在查詢的時候也了容災(zāi)機制,即從磁盤獲取服務(wù)數(shù)據(jù),而這個磁盤的數(shù)據(jù)其實也是來自于內(nèi)存,有一個定時任務(wù)定時從內(nèi)存緩存中獲取然后加載到磁盤。Nacos SDK的容災(zāi)機制默認關(guān)閉,可通過設(shè)置環(huán)境變量failover-mode=true來開啟。

架構(gòu)圖

用戶查詢流程

查詢服務(wù)實例部分源碼

private final ConcurrentMap<String, ServiceInfo> serviceInfoMap;
@Override
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
String clusterString = StringUtils.join(clusters, ",");
//這里默認傳過來是true
if (subscribe) {
//從本地內(nèi)存獲取服務(wù)數(shù)據(jù),如果獲取不到則從磁盤獲取
serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
if (null == serviceInfo || !clientProxy.isSubscribed(serviceName, groupName, clusterString)) {
//如果從本地獲取不到數(shù)據(jù),則調(diào)用訂閱方法
serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
}
} else {
//適用于不走訂閱,直接從服務(wù)端獲取數(shù)據(jù)的情況
serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
}
List<Instance> list;
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
return new ArrayList<Instance>();
}
return list;
}
}
//從本地內(nèi)存獲取服務(wù)數(shù)據(jù),如果開啟了故障轉(zhuǎn)移則直接從磁盤獲取,因為當(dāng)服務(wù)端掛了,本地啟動時內(nèi)存中也沒有數(shù)據(jù)
public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: {}", failoverReactor.isFailoverSwitch());
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
String key = ServiceInfo.getKey(groupedServiceName, clusters);
//故障轉(zhuǎn)移則直接從磁盤獲取
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
//返回內(nèi)存中數(shù)據(jù)
return serviceInfoMap.get(key);
}

3. 結(jié)語

本篇文章向大家介紹 Nacos 服務(wù)發(fā)現(xiàn)的基本概念和核心能力以及實現(xiàn)的原理,旨在讓大家對 Nacos 的服務(wù)注冊與發(fā)現(xiàn)功能有更多的了解,做到心中有數(shù)。

責(zé)任編輯:武曉燕 來源: 樓仔
相關(guān)推薦

2025-05-08 09:31:06

2018-12-05 20:58:53

2021-10-26 00:07:35

TCP連接python

2023-11-13 10:00:09

數(shù)據(jù)中心服務(wù)器

2019-07-15 09:09:29

RedisJava操作系統(tǒng)

2021-08-04 11:54:25

Nacos注冊中心設(shè)計

2020-01-15 11:30:59

編碼優(yōu)化性能

2023-03-01 08:15:10

NginxNacos

2022-02-23 15:09:18

數(shù)字化轉(zhuǎn)型國有企業(yè)數(shù)據(jù)

2011-11-02 10:59:03

陳旭東聯(lián)想云計算

2023-10-30 09:35:01

注冊中心微服務(wù)

2024-12-27 00:37:46

2023-04-04 09:44:52

數(shù)據(jù)中心能源安全

2022-08-30 22:12:19

Nacos組件服務(wù)注冊

2018-12-24 10:04:35

SAP Concur智慧費用管理

2013-03-18 13:31:28

2024-06-25 12:45:05

2010-09-01 11:17:24

數(shù)據(jù)中心搬遷

2018-08-15 08:33:33

編程Go語言開發(fā)

2018-12-07 10:16:18

數(shù)據(jù)中心Kubernetes服務(wù)器
點贊
收藏

51CTO技術(shù)棧公眾號

主站蜘蛛池模板: 美日韩中文字幕 | 国偷自产av一区二区三区 | 亚洲瑟瑟 | 婷婷丁香在线视频 | 国产激情福利 | www.五月婷婷.com | 天天射网站 | 日本一区二区三区免费观看 | 亚洲一区二区久久 | 亚洲精品视频在线看 | 久久小视频 | 中文一区 | 日韩美女在线看免费观看 | 精品伊人久久 | 综合色播| 日韩av一区二区在线观看 | 久久国产综合 | 日本一区二区三区免费观看 | 精品久久国产 | 国产不卡在线观看 | 久久精品国产免费 | 日韩免费av一区二区 | 一区二区三区免费 | 免费精品国产 | 99精品电影| 国产精品人人做人人爽 | 91精品久久久久久久久中文字幕 | 色婷婷在线视频 | 亚洲精品久久久久久久久久吃药 | www.v888av.com| 精品视频一区二区 | 亚洲精品欧美一区二区三区 | 天天天操| 黑人精品 | 99亚洲国产精品 | 日韩av啪啪网站大全免费观看 | 日本一二三区电影 | 久久精品视频在线免费观看 | 日韩欧美亚洲综合 | 91在线观看 | 久久免费高清视频 |