CSDN地址:
学习目标Dubbo的服务上下线流程以及动态感知
Dubbo的动态配置监听
Dubbo的路由监听
第1章服务上线下线监听1.1监听注册1.1.1说明以接口的上线下线为例。
服务的上线会在/dubbo//providers节点下写入相应的dubbo协议的节点数据,下线就会删除该节点的数据,那么是如何注册对该节点的监听的呢?我们来分析一下源码
1.1.2源码分析学过zookeeper的同学应该都知道,如果要监听一个节点的数据变更,就只要客户端注册一个对该节点的监听就可以了,我们来看看dubbo是如何注册监听的。首先我们应该知道,当providers节点发生了数据变更应该通知给谁,肯定是要通知给消费方的,所以这里的客户端指的就是消费方,那么我们只要看看消费方的方法,在服务引用的时候注册了监听。代码最终来到了RegistryProtocol中的doCreateInvoker方法。
protectedTClusterInvokerTdoCreateInvoker(DynamicDirectoryTdirectory,Clustercluster,Registryregistry,ClassTtype){(registry);(protocol);//allattributesofREFER_KEYMapString,Stringparameters=newHashMapString,String(().getParameters());URLurlToRegistry=newServiceConfigURL((PROTOCOL_KEY)==null?DUBBO:(PROTOCOL_KEY),(REGISTER_IP_KEY),0,getPath(parameters,type),parameters);if(()){(urlToRegistry);//把协议注册到/dubbo//consumers节点下面(());}//创建路由链(urlToRegistry);//订阅事件,对configurations,providers,routes节点建立监听(toSubscribeUrl(urlToRegistry));//返回默认的FailoverClusterInvoker对象return(ClusterInvokerT)(directory);}(toSubscribeUrl(urlToRegistry));就是对providers节点注册了监听。
@Overridepublicvoidsubscribe(URLurl){setSubscribeUrl(url);CONSUMER_CONFIGURATION_(this);//订阅事件对config中的::.configurationsreferenceConfigurationListener=newReferenceConfigurationListener(this,url);//订阅其他事件,configurationsroutes,(url,this);}(url,this);这行代码就是注册监听的代码,其实这里注册的监听目录有三个,configurationsroutes,providers。代码最终来到了ZookeeperRegistry中的doSubscribe方法。
@OverridepublicvoiddoSubscribe(finalURLurl,finalNotifyListenerlistener){try{if(ANY_(())){Stringroot=toRootPath();ConcurrentMapNotifyListener,ChildListenerlisteners=(url,k-newConcurrentHashMap());ChildListenerzkListener=(listener,k-(parentPath,currentChilds)-{for(Stringchild:currentChilds){child=(child);if(!(child)){(child);subscribe((child).addParameters(INTERFACE_KEY,child,_KEY,(false)),k);}}});(root,false);ListStringservices=(root,zkListener);if((services)){for(Stringservice:services){service=(service);(service);subscribe((service).addParameters(INTERFACE_KEY,service,_KEY,(false)),listener);}}}else{CountDownLatchlatch=newCountDownLatch(1);ListURLurls=newArrayList();//这里对应configurators,providers,routes目录for(Stringpath:toCategoriesPath(url)){ConcurrentMapNotifyListener,ChildListenerlisteners=(url,k-newConcurrentHashMap());//RegistryChildListenerImpl事件回调类,zookeeper事件回调到它ChildListenerzkListener=(listener,k-newRegistryChildListenerImpl(url,path,k,latch));if(zkListenerinstanceofRegistryChildListenerImpl){((RegistryChildListenerImpl)zkListener).setLatch(latch);}(path,false);//这里会注册zookeeper事件,并且把zookeeper事件和RegistryChildListenerImpl做映射ListStringchildren=(path,zkListener);if(children!=null){//弄一个empty协议,做初始化工作,比如清空集合容器(toUrlsWithEmpty(url,path,children));}}//启动后做初始化式的触发监听notify(url,listener,urls);//tellst();}}catch(Throwablee){thrownewRpcException("Failedtosubscribe"+url+"tozookeeper"+getUrl()+",cause:"+(),e);}}toCategoriesPath(url)该方法根据dubbo协议中的category属性的值来得到需要监听的目录,目录就有三个configurators,providers,routes。
RegistryChildListenerImpl是zookeeper的实例监听类回调的逻辑类
ListStringchildren=(path,zkListener);
在这行代码这里注册了zookeeper的监听,并且把RegistryChildListenerImpl实例传递过去了。
@OverridepublicListStringaddChildListener(Stringpath,finalChildListenerlistener){ConcurrentMapChildListener,TargetChildListenerlisteners=(path,k-newConcurrentHashMap());//ChildListener和zookeeper事件做了映射TargetChildListenertargetListener=(listener,k-createTargetChildListener(path,k));//添加zookeeper事件监听returnaddTargetChildListener(path,targetListener);}监听实例
@(Stringpath,ChildListenerlistener){(client,listener,path);}//注册zookeeper的监听,@OverridepublicListStringaddTargetChildListener(Stringpath,CuratorWatcherImpllistener){try{().usingWatcher(listener).forPath(path);}catch(NoNodeExceptione){returnnull;}catch(Exceptione){thrownewIllegalStateException((),e);}}上面的分析我们知道了,其实我们是对某个接口的三个节点注册了监听,configurators,providers,routes这个三个节点都注册上了监听了。
1.2监听触发1.2.1说明学过zookeeper的同学应该都知道,监听的触发只有我们有对监听的节点发生数据的变更就会触发监听的客户端,比如服务的上线下线就会对providers节点进行数据新增和删除操作就会触发事件。
1.2.2源码分析前面我们分析了监听的注册,下面我们来分析一下监听的触发源码,比如我们往providers节点写入一个数据,那么监听应该要触发,往providers写数据,我们采用dubboapi的方式去写一个dubbo协议的数据写到providers节点下,如下:
@TestpublicvoidproviderReg(){Stringurl="dubbo%3A%2F%2%3A20990%2%3Fanyhost%3Dtrue%26application%3Ddubbo_provider%26deprecated%3Dfalse%26dubbo%3%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3%26metadata-type%3Dremote%26methods%3DdoKill%2CqueryUser%26pid%3D14092%26release%3%26retries%3D7%26revision%3%26service-name-mapping%3Dtrue%26side%3Dprovider%26threadpool%3Dfixed%26threads%3D100%26timeout%3D5000%26timestamp%3D1635058443480";RegistryFactoryregistryFactory=().getAdaptiveExtension();Registryregistry=(("zookeeper://127.0.0.1:2181"));(((url)));}往/dubbo//providers/节点写入dubbo协议

当写入时,会触发客户端的监听代码,我们来看看监听类,zookeeper会回调到process方法来触发事件。childListener就是前面我们分析的RegistryChildListenerImpl类。
staticclassCuratorWatcherImplimplementsCuratorWatcher{privateCuratorFrameworkclient;privatevolatileChildListenerchildListener;privateStringpath;publicCuratorWatcherImpl(CuratorFrameworkclient,ChildListenerlistener,Stringpath){=client;=listener;=path;}protectedCuratorWatcherImpl(){}publicvoidunwatch(){=null;}@Overridepublicvoidprocess(WatchedEventevent)throwsException{//ifclientconnectordisconnecttoserver,zookeeperwillqueue//watchedevent(,..,path=null).if(()==){return;}if(childListener!=null){(path,().usingWatcher(this).forPath(path));}}}接下来我们看一下事件触发后都干了些什么。
privateclassRegistryChildListenerImplimplementsChildListener{privateRegistryNotifiernotifier;privatelonglastExecuteTime;privatevolatileCountDownLatchlatch;publicRegistryChildListenerImpl(URLconsumerUrl,Stringpath,NotifyListenerlistener,CountDownLatchlatch){=latch;notifier=newRegistryNotifier(()){@Overridepublicvoidnotify(ObjectrawAddresses){longdelayTime=getDelayTime();if(delayTime=0){(rawAddresses);}else{longinterval=delayTime-(()-lastExecuteTime);if(interval0){try{(interval);}catch(InterruptedExceptione){//ignore}}lastExecuteTime=();(rawAddresses);}}@OverrideprotectedvoiddoNotify(ObjectrawAddresses){(consumerUrl,listener,(consumerUrl,path,(ListString)rawAddresses));}};}publicvoidsetLatch(CountDownLatchlatch){=latch;}@OverridepublicvoidchildChanged(Stringpath,ListStringchildren){try{();}catch(InterruptedExceptione){("Zookeeperchildrenlistenerthreadwasinterruptedunexpectedly,maycauseraceconditionwiththemainthread.");}(children);}}在这里触发了逻辑
protectedvoiddoNotify(ObjectrawAddresses){(consumerUrl,listener,(consumerUrl,path,(ListString)rawAddresses));}(categoryList);触发了RegistryDirectory的notify逻辑,我们重点看看这里。
for(,ListURLentry:()){Stringcategory=();ListURLcategoryList=();(category,categoryList);(categoryList);//Wewillupdateourcachefileaftereachnotification.//WhenourRegistryhasasubscribefailureduetonetworkjitter,(localCacheEnabled){saveProperties(url);}}RegistryDirectory的notify逻辑
//事件监听回调@Overridepublicsynchronizedvoidnotify(ListURLurls){if(isDestroyed()){return;}//对回调的协议分组//routes:////override:////dubbo://MapString,ListURLcategoryUrls=().filter(Objects::nonNull).filter(this::isValidCategory).filter(this::isNotCompatibleFor26x).collect((this::judgeCategory));ListURLconfiguratorURLs=(CONFIGURATORS_CATEGORY,());=(configuratorURLs).orElse();ListURLrouterURLs=(ROUTERS_CATEGORY,());//生成路由规则,加入到规则链中toRouters(routerURLs).ifPresent(this::addRouters);//providersListURLproviderURLs=(PROVIDERS_CATEGORY,());/***3.xaddedforextURLaddress*/ExtensionLoaderAddressListeneraddressListenerExtensionLoader=();ListAddressListenersupportedListeners=(getUrl(),(String[])null);if(supportedListeners!=null!()){for(AddressListeneraddressListener:supportedListeners){providerURLs=(providerURLs,getConsumerUrl(),this);}}//刷新本地服务列表refreshOverrideAndInvoker(providerURLs);}如果触发的是一个providers协议,那么configuratorURLs和routerURLs都是空的。那么直接走到了refreshOverrideAndInvoker(providerURLs);
privatesynchronizedvoidrefreshOverrideAndInvoker(ListURLurls){//mockzookeeper://xxx?mock=returnnulloverrideDirectoryUrl();//刷新本地列表refreshInvoker(urls);}直接走到了refreshInvoker(urls);这个方法的大概意思就是刷新本地列表,就是刷新invokers变量的值。
//刷新本地服务列表privatevoidrefreshInvoker(ListURLinvokerUrls){(invokerUrls,"invokerUrlsshouldnotbenull");if(()==1(0)!=nullEMPTY_((0).getProtocol())){=true;//=();();destroyAllInvokers();//Closeallinvokers}else{=false;//AllowtoaccessMapURL,InvokerToldUrlInvokerMap=;//localreferenceif(invokerUrls==()){invokerUrls=newArrayList();}if(()!=null){();}else{=newHashSet();(invokerUrls);//Cachedinvokerurls,convenientforcomparison}if(()){return;}//创建url和invoker对象的映射关系MapURL,InvokerTnewUrlInvokerMap=toInvokers(invokerUrls);//TranslateurllisttoInvokermap/***Ifthecalculationiswrong,itisnotprocessed.**1.Theprotocolconfiguredbytheclientisinconsistentwiththeprotocoloftheserver.*eg:consumerprotocol=dubbo,provideronlyhasotherprotocolservices(rest).*2.Theregistrationcenterisnotrobustandpushesillegalspecificationdata.**/if((newUrlInvokerMap)){(newIllegalStateException(":"+()+",:0.urls:"+()));return;}//所有的invoker对象ListInvokerTnewInvokers=(newArrayList(()));//pre-routeandbuildcache,noticethatroutecacheshouldbuildonoriginalInvokerlist.//toMergeMethodInvokerMap()willwrapsomeinvokershavingdifferentgroups,(newInvokers);=multiGroup?toMergeInvokerList(newInvokers):newInvokers;=newUrlInvokerMap;try{destroyUnusedInvokers(oldUrlInvokerMap,newUrlInvokerMap);//ClosetheunusedInvoker}catch(Exceptione){("destroyUnusedInvokerserror.",e);}//();}}会在//创建url和invoker对象的映射关系MapURL,InvokernewUrlInvokerMap=toInvokers(invokerUrls);
这行代码中根据传递进来的dubbo协议去生成invoker对象,其实就是调用了方法生成了invoker对象了。
privateMapURL,InvokerTtoInvokers(ListURLurls){MapURL,InvokerTnewUrlInvokerMap=newConcurrentHashMap();if(urls==null||()){returnnewUrlInvokerMap;}StringqueryProtocols=(PROTOCOL_KEY);for(URLproviderUrl:urls){//Ifprotocolisconfiguredatthereferenceside,onlythematchingprotocolisselectedif(queryProtocols!=()0){booleanaccept=false;String[]acceptProtocols=(",");for(StringacceptProtocol:acceptProtocols){if(().equals(acceptProtocol)){accept=true;break;}}if(!accept){continue;}}if(EMPTY_(())){continue;}if(!().hasExtension(())){(newIllegalStateException("Unsupportedprotocol"+()+"innotifiedurl:"+providerUrl+"fromregistry"+getUrl().getAddress()+"toconsumer"+()+",supportedprotocol:"+().getSupportedExtensions()));continue;}URLurl=mergeUrl(providerUrl);//Cachekeyisurlthatdoesnotmergewithconsumersideparameters,regardlessofhowtheconsumercombinesparameters,iftheserverurlchanges,thenreferagainMapURL,InvokerTlocalUrlInvokerMap=;//localreferenceInvokerTinvoker=localUrlInvokerMap==null?null:(url);if(invoker==null){//Notinthecache,referagaintry{booleanenabled=true;if((DISABLED_KEY)){enabled=!(DISABLED_KEY,false);}else{enabled=(ENABLED_KEY,true);}if(enabled){//生成invoker对象invoker=(serviceType,url);}}catch(Throwablet){("Failedtoreferinvokerforinterface:"+serviceType+",url:("+url+")"+(),t);}if(invoker!=null){//(url,invoker);}}else{(url,invoker);}}returnnewUrlInvokerMap;}然后在这行代码
=multiGroup?toMergeInvokerList(newInvokers):newInvokers;
刷新了服务列表。
我们从上面的分析中可以看到,其实触发的事件最终的目的就是为了刷新客户端本地的服务列表,把新注册的dubbo协议通过生成了一个新的invoker对象,然后加入到了本地服务列表中了。
免责声明:本文章如果文章侵权,请联系我们处理,本站仅提供信息存储空间服务如因作品内容、版权和其他问题请于本站联系