掃二維碼與項目經(jīng)理溝通
我們在微信上24小時期待你的聲音
解答本文疑問/技術(shù)咨詢/運(yùn)營咨詢/技術(shù)建議/互聯(lián)網(wǎng)交流
本文轉(zhuǎn)載自微信公眾號「程序新視界」,作者二師兄。轉(zhuǎn)載本文請聯(lián)系程序新視界公眾號。

說起Nacos的服務(wù)訂閱機(jī)制,對此不了解的朋友,可能感覺非常神秘,這篇文章就大家深入淺出的了解一下Nacos 2.0客戶端的訂閱實現(xiàn)。由于涉及到的內(nèi)容比較多,就分幾篇來講,本篇為第一篇。
Nacos的訂閱機(jī)制,如果用一句話來描述就是:Nacos客戶端通過一個定時任務(wù),每6秒從注冊中心獲取實例列表,當(dāng)發(fā)現(xiàn)實例發(fā)生變化時,發(fā)布變更事件,訂閱者進(jìn)行業(yè)務(wù)處理。該更新實例的更新實例,該更新本地緩存的更新本地緩存。
nacos
上圖畫出了訂閱方法的主線流程,涉及的內(nèi)容較多,處理細(xì)節(jié)復(fù)雜。這里只用把握住核心部分即可。下面就通過代碼和流程圖來逐步分析上述過程。
我們這里聊的訂閱機(jī)制,其實本質(zhì)上就是服務(wù)發(fā)現(xiàn)的準(zhǔn)實時感知。上面已經(jīng)看到了當(dāng)執(zhí)行訂閱方法時,會觸發(fā)定時任務(wù),定時去拉服務(wù)器端的數(shù)據(jù)。所以,本質(zhì)上,訂閱機(jī)制就是實現(xiàn)服務(wù)發(fā)現(xiàn)的一種方式,對照的方式就是直接查詢接口了。
NacosNamingService中暴露的許多重載的subscribe,重載的目的就是讓大家少寫一些參數(shù),這些參數(shù)呢,Nacos給默認(rèn)處理了。最終這些重載方法都會調(diào)用到下面這個方法:
- // NacosNamingService
- public void subscribe(String serviceName, String groupName, List
clusters, EventListener listener) - throws NacosException {
- if (null == listener) {
- return;
- }
- String clusterString = StringUtils.join(clusters, ",");
- changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
- clientProxy.subscribe(serviceName, groupName, clusterString);
- }
方法中的事件監(jiān)聽我們暫時不聊,直接看subscribe方法,這里clientProxy類型為NamingClientProxyDelegate。實例化NacosNamingService時該類被實例化,前面章節(jié)中已經(jīng)講到,不再贅述。
而clientProxy.subscribe方法在NamingClientProxyDelegate中實現(xiàn):
- // NamingClientProxyDelegate
- @Override
- public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
- String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
- String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
- // 獲取緩存中的ServiceInfo
- ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
- if (null == result) {
- // 如果為null,則進(jìn)行訂閱邏輯處理,基于gRPC協(xié)議
- result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
- }
- // 定時調(diào)度UpdateTask
- serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
- // ServiceInfo本地緩存處理
- serviceInfoHolder.processServiceInfo(result);
- return result;
- }
這段方法是不是眼熟啊?對的,在前面分析《Nacos Client服務(wù)發(fā)現(xiàn)》時我們已經(jīng)講過了??磥硎馔就瑲w,查詢服務(wù)列表和訂閱最終都調(diào)用了同一個方法。
上篇講了其他流程,我們這里重點(diǎn)看任務(wù)調(diào)度:
- // ServiceInfoUpdateService
- public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {
- String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
- if (futureMap.get(serviceKey) != null) {
- return;
- }
- synchronized (futureMap) {
- if (futureMap.get(serviceKey) != null) {
- return;
- }
- // 構(gòu)建UpdateTask
- ScheduledFuture> future = addTask(new UpdateTask(serviceName, groupName, clusters));
- futureMap.put(serviceKey, future);
- }
- }
該方法包含了構(gòu)建serviceKey、通過serviceKey判重,最后添加UpdateTask。
而其中的addTask的實現(xiàn)就是發(fā)起了一個定時任務(wù):
- // ServiceInfoUpdateService
- private synchronized ScheduledFuture> addTask(UpdateTask task) {
- return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
- }
定時任務(wù)延時1秒執(zhí)行。
跟蹤到這里就告一階段了。核心功能只有兩個:調(diào)用訂閱方法和發(fā)起定時任務(wù)。
UpdateTask封裝了訂閱機(jī)制的核心業(yè)務(wù)邏輯,先來通過一張流程圖看一下都做了啥。
nacos
有了上述流程圖,基本就很清晰的了解UpdateTask所做的事情了。直接貼出run方法的所有代碼:
- public void run() {
- long delayTime = DEFAULT_DELAY;
- try {
- // 判斷該注冊的Service是否被訂閱,如果沒有訂閱則不再執(zhí)行
- if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKey)) {
- NAMING_LOGGER
- .info("update task is stopped, service:" + groupedServiceName + ", clusters:" + clusters);
- return;
- }
- // 獲取緩存的service信息
- ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
- if (serviceObj == null) {
- // 根據(jù)serviceName從注冊中心服務(wù)端獲取Service信息
- serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
- serviceInfoHolder.processServiceInfo(serviceObj);
- lastRefTime = serviceObj.getLastRefTime();
- return;
- }
- // 過期服務(wù)(服務(wù)的最新更新時間小于等于緩存刷新時間),從注冊中心重新查詢
- if (serviceObj.getLastRefTime() <= lastRefTime) {
- serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
- // 處理Service消息
- serviceInfoHolder.processServiceInfo(serviceObj);
- }
- // 刷新更新時間
- lastRefTime = serviceObj.getLastRefTime();
- if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
- incFailCount();
- return;
- }
- // 下次更新緩存時間設(shè)置,默認(rèn)為6秒
- // TODO multiple time can be configured.
- delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;
- // 重置失敗數(shù)量為0
- resetFailCount();
- } catch (Throwable e) {
- incFailCount();
- NAMING_LOGGER.warn("[NA] failed to update serviceName: " + groupedServiceName, e);
- } finally {
- // 下次調(diào)度刷新時間,下次執(zhí)行的時間與failCount有關(guān)
- // failCount=0,則下次調(diào)度時間為6秒,最長為1分鐘
- // 即當(dāng)無異常情況下緩存實例的刷新時間是6秒
- executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
- }
- }
首先在判斷服務(wù)是否是被訂閱過,實現(xiàn)方法是ChangeNotifier#isSubscribed:
- public boolean isSubscribed(String groupName, String serviceName, String clusters) {
- String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
- ConcurrentHashSet
eventListeners = listenerMap.get(key); - return CollectionUtils.isNotEmpty(eventListeners);
- }
查看該方法的源碼會發(fā)現(xiàn),這里的listenerMap正是最開始的subscribe方法中registerListener注冊的EventListener。
run方法后面的業(yè)務(wù)處理基本上都雷同了,先判斷緩存是否有ServiceInfo信息,如果沒有則查詢注冊中心、處理ServiceInfo、更新上次處理時間。
而下面判斷ServiceInfo是否失效,正是通過“上次更新時間”與當(dāng)前ServiceInfo中的“上次更新時間”做比較來判斷。如果失效,也會查詢注冊中心、處理ServiceInfo、更新上次處理時間等一系列操作。
業(yè)務(wù)邏輯最后會計算下一次定時任務(wù)的執(zhí)行時間,通過delayTime來延遲執(zhí)行。delayTime默認(rèn)為 1000L * 6,也就是6秒。而在finally里面真的發(fā)起下一次定時任務(wù)。當(dāng)出現(xiàn)異常時,下次執(zhí)行的時間與失敗次數(shù)有關(guān),但最長不超過1分鐘。
這一篇我們講了Nacos客戶端服務(wù)訂閱機(jī)制的源碼,主要有以下步驟:
第一步:訂閱方法的調(diào)用,并進(jìn)行EventListener的注冊,后面UpdateTask要用來進(jìn)行判斷;
第二步:通過委托代理類來處理訂閱邏輯,此處與獲取實例列表方法使用了同一個方法;
第三步:通過定時任務(wù)執(zhí)行UpdateTask方法,默認(rèn)執(zhí)行間隔為6秒,當(dāng)發(fā)生異常時會延長,但不超過1分鐘;
第四步:UpdateTask方法中會比較本地是否存在緩存,緩存是否過期。當(dāng)不存在或過期時,查詢注冊中心,獲取最新實例,更新最后獲取時間,處理ServiceInfo。
第五步:重新計算定時任務(wù)時間,循環(huán)執(zhí)行上述流程。

我們在微信上24小時期待你的聲音
解答本文疑問/技術(shù)咨詢/運(yùn)營咨詢/技術(shù)建議/互聯(lián)網(wǎng)交流