dubbo进阶——服务导出
创始人
2024-06-01 14:25:52
0

服务导出

在这里记录一下对" Dubbo 导出服务的过程"的研究。

触发时机

public class ServiceBean extends ServiceConfig implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener, BeanNameAware {@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {// 是否有延迟导出 && 是否已导出 && 是不是已被取消导出if (isDelay() && !isExported() && !isUnexported()) {if (logger.isInfoEnabled()) {logger.info("The service ready on spring started. service: " + getInterface());}// 导出服务export();}}
}

首先这里可以看到Dubbo 服务导出过程始于 Spring 容器发布刷新事件,Dubbo 在接收到事件后,会立即执行服务导出逻辑。详解:ServiceBean和Spring有关,它继承了InitializingBean和ApplicationEvent。在Bean初始化完成后会调用InitializingBean.afterPropertiesSet方法来执行服务暴露的准备工作。在Spring的context完成初始化后,会触发ApplicationEventListener事件进行服务导出,会执行onApplicationEvent方法。这时服务服务暴露就开始了

export()

public synchronized void export() {if (provider != null) {// 获取 export 和 delay 配置if (export == null) {export = provider.getExport();}if (delay == null) {delay = provider.getDelay();}}// 如果 export 为 false,则不导出服务if (export != null && !export) {return;}// delay > 0,延时导出服务if (delay != null && delay > 0) {delayExportExecutor.schedule(new Runnable() {@Overridepublic void run() {doExport();}}, delay, TimeUnit.MILLISECONDS);// 立即导出服务} else {doExport();}
}

这里可以看到主要是对export和delay两项配置进行检查和处理

doExport()

protected synchronized void doExport() {if (unexported) {throw new IllegalStateException("Already unexported!");}if (exported) {return;}exported = true;// 检测 interfaceName 是否合法if (interfaceName == null || interfaceName.length() == 0) {throw new IllegalStateException("interface not allow null!");}// 检测 provider 是否为空,为空则新建一个,并通过系统变量为其初始化checkDefault();// 下面几个 if 语句用于检测 provider、application 等核心配置类对象是否为空,// 若为空,则尝试从其他配置类对象中获取相应的实例。if (provider != null) {if (application == null) {application = provider.getApplication();}if (module == null) {module = provider.getModule();}if (registries == null) {...}if (monitor == null) {...}if (protocols == null) {...}}if (module != null) {if (registries == null) {registries = module.getRegistries();}if (monitor == null) {...}}if (application != null) {if (registries == null) {registries = application.getRegistries();}if (monitor == null) {...}}// 检测 ref 是否为泛化服务类型if (ref instanceof GenericService) {// 设置 interfaceClass 为 GenericService.classinterfaceClass = GenericService.class;if (StringUtils.isEmpty(generic)) {// 设置 generic = "true"generic = Boolean.TRUE.toString();}// ref 非 GenericService 类型} else {try {interfaceClass = Class.forName(interfaceName, true, Thread.currentThread().getContextClassLoader());} catch (ClassNotFoundException e) {throw new IllegalStateException(e.getMessage(), e);}// 对 interfaceClass,以及  标签中的必要字段进行检查checkInterfaceAndMethods(interfaceClass, methods);// 对 ref 合法性进行检测checkRef();// 设置 generic = "false"generic = Boolean.FALSE.toString();}// local 和 stub 在功能应该是一致的,用于配置本地存根if (local != null) {if ("true".equals(local)) {local = interfaceName + "Local";}Class localClass;try {// 获取本地存根类localClass = ClassHelper.forNameWithThreadContextClassLoader(local);} catch (ClassNotFoundException e) {throw new IllegalStateException(e.getMessage(), e);}// 检测本地存根类是否可赋值给接口类,若不可赋值则会抛出异常,提醒使用者本地存根类类型不合法if (!interfaceClass.isAssignableFrom(localClass)) {throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName);}}if (stub != null) {// 此处的代码和上一个 if 分支的代码基本一致,这里省略}// 检测各种对象是否为空,为空则新建,或者抛出异常checkApplication();checkRegistry();checkProtocol();appendProperties(this);checkStubAndMock(interfaceClass);if (path == null || path.length() == 0) {path = interfaceName;}// 导出服务doExportUrls();// ProviderModel 表示服务提供者模型,此对象中存储了与服务提供者相关的信息。// 比如服务的配置信息,服务实例等。每个被导出的服务对应一个 ProviderModel。// ApplicationModel 持有所有的 ProviderModel。ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
}

上面代码的主要逻辑是:
1.检测 < dubbo:service > 标签的 interface 属性合法性,不合法则抛出异常
2.检测 ProviderConfig、ApplicationConfig 等核心配置类对象是否为空,若为空,则尝试从其他配置类对象中获取相应的实例。
3.检测并处理泛化服务和普通服务类
4.检测本地存根配置,并进行相应的处理
5.对 ApplicationConfig、RegistryConfig 等配置类进行检测,为空则尝试创建,若无法创建则抛出异常

doExportUrls()

Dubbo 允许我们使用不同的协议导出服务,也允许我们向多个注册中心注册服务。Dubbo 在 doExportUrls 方法中对多协议,多注册中心进行了支持。

private void doExportUrls() {// 加载注册中心链接List registryURLs = loadRegistries(true);// 遍历 protocols,并在每个协议下导出服务for (ProtocolConfig protocolConfig : protocols) {doExportUrlsFor1Protocol(protocolConfig, registryURLs);}
}

这里首先是通过 loadRegistries 加载注册中心链接,然后再遍历 ProtocolConfig 集合导出每个服务。并在导出服务的过程中,将服务注册到注册中心。

loadRegistries()

protected List loadRegistries(boolean provider) {// 检测是否存在注册中心配置类,不存在则抛出异常checkRegistry();List registryList = new ArrayList();if (registries != null && !registries.isEmpty()) {for (RegistryConfig config : registries) {String address = config.getAddress();if (address == null || address.length() == 0) {// 若 address 为空,则将其设为 0.0.0.0address = Constants.ANYHOST_VALUE;}// 从系统属性中加载注册中心地址String sysaddress = System.getProperty("dubbo.registry.address");if (sysaddress != null && sysaddress.length() > 0) {address = sysaddress;}// 检测 address 是否合法if (address.length() > 0 && !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {Map map = new HashMap();// 添加 ApplicationConfig 中的字段信息到 map 中appendParameters(map, application);// 添加 RegistryConfig 字段信息到 map 中appendParameters(map, config);// 添加 path、pid,protocol 等信息到 map 中map.put("path", RegistryService.class.getName());map.put("dubbo", Version.getProtocolVersion());map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));if (ConfigUtils.getPid() > 0) {map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));}if (!map.containsKey("protocol")) {if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) {map.put("protocol", "remote");} else {map.put("protocol", "dubbo");}}// 解析得到 URL 列表,address 可能包含多个注册中心 ip,// 因此解析得到的是一个 URL 列表List urls = UrlUtils.parseURLs(address, map);for (URL url : urls) {url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());// 将 URL 协议头设置为 registryurl = url.setProtocol(Constants.REGISTRY_PROTOCOL);// 通过判断条件,决定是否添加 url 到 registryList 中,条件如下:// (服务提供者 && register = true 或 null) //    || (非服务提供者 && subscribe = true 或 null)if ((provider && url.getParameter(Constants.REGISTER_KEY, true))|| (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {registryList.add(url);}}}}}return registryList;
}

loadRegistries 方法主要包含如下的逻辑:
1.检测是否存在注册中心配置类,不存在则抛出异常
2.构建参数映射集合,也就是 map
3.构建注册中心链接列表
4.遍历链接列表,并根据条件决定是否将其添加到 registryList 中

doExportUrlsFor1Protocol()的URL组装

配置检查完毕后,紧接着要做的事情是根据配置,以及其他一些信息组装 URL。下面是URL组装的过程。

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List registryURLs) {String name = protocolConfig.getName();// 如果协议名为空,或空串,则将协议名变量设置为 dubboif (name == null || name.length() == 0) {name = "dubbo";}Map map = new HashMap();// 添加 side、版本、时间戳以及进程号等信息到 map 中map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));if (ConfigUtils.getPid() > 0) {map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));}// 通过反射将对象的字段信息添加到 map 中appendParameters(map, application);appendParameters(map, module);appendParameters(map, provider, Constants.DEFAULT_KEY);appendParameters(map, protocolConfig);appendParameters(map, this);// methods 为 MethodConfig 集合,MethodConfig 中存储了  标签的配置信息if (methods != null && !methods.isEmpty()) {// 这段代码用于添加 Callback 配置到 map 中,代码太长,待会单独分析}// 检测 generic 是否为 "true",并根据检测结果向 map 中添加不同的信息if (ProtocolUtils.isGeneric(generic)) {map.put(Constants.GENERIC_KEY, generic);map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);} else {String revision = Version.getVersion(interfaceClass, version);if (revision != null && revision.length() > 0) {map.put("revision", revision);}// 为接口生成包裹类 Wrapper,Wrapper 中包含了接口的详细信息,比如接口方法名数组,字段信息等String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();// 添加方法名到 map 中,如果包含多个方法名,则用逗号隔开,比如 method = init,destroyif (methods.length == 0) {logger.warn("NO method found in service interface ...");map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);} else {// 将逗号作为分隔符连接方法名,并将连接后的字符串放入 map 中map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet(Arrays.asList(methods)), ","));}}// 添加 token 到 map 中if (!ConfigUtils.isEmpty(token)) {if (ConfigUtils.isDefault(token)) {// 随机生成 tokenmap.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());} else {map.put(Constants.TOKEN_KEY, token);}}// 判断协议名是否为 injvmif (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {protocolConfig.setRegister(false);map.put("notify", "false");}// 获取上下文路径String contextPath = protocolConfig.getContextpath();if ((contextPath == null || contextPath.length() == 0) && provider != null) {contextPath = provider.getContextpath();}// 获取 host 和 portString host = this.findConfigedHosts(protocolConfig, registryURLs, map);Integer port = this.findConfigedPorts(protocolConfig, name, map);// 组装 URLURL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);// 省略无关代码
}

首先是将一些信息,比如版本、时间戳、方法名以及各种配置对象的字段信息放入到 map 中,map 中的内容将作为 URL 的查询字符串。构建好 map 后,紧接着是获取上下文路径、主机名以及端口号等信息。最后将 map 和主机名等数据传给 URL 构造方法创建 URL 对象。需要注意的是,这里出现的 URL 并非 java.net.URL,而是 com.alibaba.dubbo.common.URL。

doExportUrlsFor1Protocol()的导出dubbo服务

前置工作做完,接下来就可以进行服务导出了。服务导出分为导出到本地 (JVM),和导出到远程。

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List registryURLs) {// 省略无关代码if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) {// 加载 ConfiguratorFactory,并生成 Configurator 实例,然后通过实例配置 urlurl = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getExtension(url.getProtocol()).getConfigurator(url).configure(url);}String scope = url.getParameter(Constants.SCOPE_KEY);// 如果 scope = none,则什么都不做if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {// scope != remote,导出到本地if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {exportLocal(url);}// scope != local,导出到远程if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {if (registryURLs != null && !registryURLs.isEmpty()) {for (URL registryURL : registryURLs) {url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));// 加载监视器链接URL monitorUrl = loadMonitor(registryURL);if (monitorUrl != null) {// 将监视器链接作为参数添加到 url 中url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());}String proxy = url.getParameter(Constants.PROXY_KEY);if (StringUtils.isNotEmpty(proxy)) {registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);}// 为服务提供类(ref)生成 InvokerInvoker invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));// DelegateProviderMetaDataInvoker 用于持有 Invoker 和 ServiceConfigDelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);// 导出服务,并生成 ExporterExporter exporter = protocol.export(wrapperInvoker);exporters.add(exporter);}// 不存在注册中心,仅导出服务} else {Invoker invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);//存储Dubbo服务的元数据,元数据可以存储在远端配置中心和本地,默认是存储在本地DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);Exporter exporter = protocol.export(wrapperInvoker);exporters.add(exporter);}}}this.urls.add(url);
}

根据 url 中的 scope 参数决定服务导出方式,分别如下:
1.scope = none,不导出服务
2.scope != remote,导出到本地
3.scope != local,导出到远程
不管是导出到本地,还是远程。进行服务导出之前,均需要先创建 Invoker。

Invoker 创建过程getInvoker()

Invoker 是实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起 invoke 调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。Invoker 是由 ProxyFactory 创建而来,Dubbo 默认的 ProxyFactory 实现类是 JavassistProxyFactory。

public  Invoker getInvoker(T proxy, Class type, URL url) {// 为目标类创建 Wrapperfinal Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);// 创建匿名 Invoker 类对象,并实现 doInvoke 方法。return new AbstractProxyInvoker(proxy, type, url) {@Overrideprotected Object doInvoke(T proxy, String methodName,Class[] parameterTypes,Object[] arguments) throws Throwable {// 调用 Wrapper 的 invokeMethod 方法,invokeMethod 最终会调用目标方法return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);}};
}

JavassistProxyFactory 创建了一个继承自 AbstractProxyInvoker 类的匿名对象,并覆写了抽象方法 doInvoke。覆写后的 doInvoke 逻辑比较简单,仅是将调用请求转发给了 Wrapper 类的 invokeMethod 方法。Wrapper 用于“包裹”目标类,Wrapper 是一个抽象类,仅可通过 getWrapper(Class) 方法创建子类。在创建 Wrapper 子类的过程中,子类代码生成逻辑会对 getWrapper 方法传入的 Class 对象进行解析,拿到诸如类方法,类成员变量等信息。以及生成 invokeMethod 方法代码和其他一些方法代码。代码生成完毕后,通过 Javassist 生成 Class 对象,最后再通过反射创建 Wrapper 实例

exportLocal()导出服务到本地

private void exportLocal(URL url) {// 如果 URL 的协议头等于 injvm,说明已经导出到本地了,无需再次导出if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {URL local = URL.valueOf(url.toFullString()).setProtocol(Constants.LOCAL_PROTOCOL)    // 设置协议头为 injvm.setHost(LOCALHOST).setPort(0);ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));// 创建 Invoker,并导出服务,这里的 protocol 会在运行时调用 InjvmProtocol 的 export 方法Exporter exporter = protocol.export(proxyFactory.getInvoker(ref, (Class) interfaceClass, local));exporters.add(exporter);}
}

首先根据 URL 协议头决定是否导出服务。若需导出,则创建一个新的 URL 并将协议头、主机名以及端口设置成新的值。然后创建 Invoker,并调用 InjvmProtocol 的 export 方法导出服务。

public  Exporter export(Invoker invoker) throws RpcException {// 创建 InjvmExporterreturn new InjvmExporter(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}

InjvmProtocol 的 export 方法仅创建了一个 InjvmExporter。

RegistryProtocol.export()导出服务到远程

public  Exporter export(final Invoker originInvoker) throws RpcException {// 导出服务final ExporterChangeableWrapper exporter = doLocalExport(originInvoker);// 获取注册中心 URL,以 zookeeper 注册中心为例,得到的示例 URL 如下:// zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.17.48.52%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-providerURL registryUrl = getRegistryUrl(originInvoker);// 根据 URL 加载 Registry 实现类,比如 ZookeeperRegistryfinal Registry registry = getRegistry(originInvoker);// 获取已注册的服务提供者 URL,比如:// dubbo://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHellofinal URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);// 获取 register 参数boolean register = registeredProviderUrl.getParameter("register", true);// 向服务提供者与消费者注册表中注册服务提供者ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);// 根据 register 的值决定是否注册服务if (register) {// 向注册中心注册服务register(registryUrl, registeredProviderUrl);ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);}// 获取订阅 URL,比如:// provider://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?category=configurators&check=false&anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHellofinal URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);// 创建监听器final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);// 向注册中心进行订阅 override 数据registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);// 创建并返回 DestroyableExporterreturn new DestroyableExporter(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}

主要做如下一些操作:
1.调用 doLocalExport 导出服务
2.向注册中心注册服务
3.向注册中心进行订阅 override 数据
4.创建并返回 DestroyableExporter

doLocalExport()

private  ExporterChangeableWrapper doLocalExport(final Invoker originInvoker) {String key = getCacheKey(originInvoker);// 访问缓存ExporterChangeableWrapper exporter = (ExporterChangeableWrapper) bounds.get(key);if (exporter == null) {synchronized (bounds) {exporter = (ExporterChangeableWrapper) bounds.get(key);if (exporter == null) {// 创建 Invoker 为委托类对象final Invoker invokerDelegete = new InvokerDelegete(originInvoker, getProviderUrl(originInvoker));// 调用 protocol 的 export 方法导出服务exporter = new ExporterChangeableWrapper((Exporter) protocol.export(invokerDelegete), originInvoker);// 写缓存bounds.put(key, exporter);}}}return exporter;
}

这里是典型的双重检查锁

DubboProtocol.export()

public  Exporter export(Invoker invoker) throws RpcException {URL url = invoker.getUrl();// 获取服务标识,理解成服务坐标也行。由服务组名,服务名,服务版本号以及端口组成。比如:// demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880String key = serviceKey(url);// 创建 DubboExporterDubboExporter exporter = new DubboExporter(invoker, key, exporterMap);// 将  键值对放入缓存中exporterMap.put(key, exporter);// 本地存根相关代码Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);if (isStubSupportEvent && !isCallbackservice) {String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);if (stubServiceMethods == null || stubServiceMethods.length() == 0) {// 省略日志打印代码} else {stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);}}// 启动服务器openServer(url);// 优化序列化optimizeSerialization(url);return exporter;
}

openServer()

private void openServer(URL url) {// 获取 host:port,并将其作为服务器实例的 key,用于标识当前的服务器实例String key = url.getAddress();boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);if (isServer) {// 访问缓存ExchangeServer server = serverMap.get(key);if (server == null) {// 创建服务器实例serverMap.put(key, createServer(url));} else {// 服务器已创建,则根据 url 中的配置重置服务器server.reset(url);}}
}

createServer()

服务器实例的创建过程

private ExchangeServer createServer(URL url) {url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY,// 添加心跳检测配置到 url 中url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));// 获取 server 参数,默认为 nettyString str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);// 通过 SPI 检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))throw new RpcException("Unsupported server type: " + str + ", url: " + url);// 添加编码解码器参数url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);ExchangeServer server;try {// 创建 ExchangeServerserver = Exchangers.bind(url, requestHandler);} catch (RemotingException e) {throw new RpcException("Fail to start server...");}// 获取 client 参数,可指定 netty,minastr = url.getParameter(Constants.CLIENT_KEY);if (str != null && str.length() > 0) {// 获取所有的 Transporter 实现类名称集合,比如 supportedTypes = [netty, mina]Set supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();// 检测当前 Dubbo 所支持的 Transporter 实现类名称列表中,// 是否包含 client 所表示的 Transporter,若不包含,则抛出异常if (!supportedTypes.contains(str)) {throw new RpcException("Unsupported client type...");}}return server;
}

createServer 包含三个核心的逻辑。第一是检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常。第二是创建服务器实例。第三是检测是否支持 client 参数所表示的 Transporter 拓展,不存在也是抛出异常。

bind()

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");}if (handler == null) {throw new IllegalArgumentException("handler == null");}url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");// 获取 Exchanger,默认为 HeaderExchanger。// 紧接着调用 HeaderExchanger 的 bind 方法创建 ExchangeServer 实例return getExchanger(url).bind(url, handler);
}

HeaderExchanger 的 bind 方法

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {// 创建 HeaderExchangeServer 实例,该方法包含了多个逻辑,分别如下://   1. new HeaderExchangeHandler(handler)//	 2. new DecodeHandler(new HeaderExchangeHandler(handler))//   3. Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

Transporters 的 bind 方法

public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");}if (handlers == null || handlers.length == 0) {throw new IllegalArgumentException("handlers == null");}ChannelHandler handler;if (handlers.length == 1) {handler = handlers[0];} else {// 如果 handlers 元素数量大于1,则创建 ChannelHandler 分发器handler = new ChannelHandlerDispatcher(handlers);}// 获取自适应 Transporter 实例,并调用实例方法return getTransporter().bind(url, handler);
}

getTransporter() 方法获取的 Transporter 是在运行时动态创建的,类名为 TransporterAdaptive,也就是自适应拓展类。TransporterAdaptive,也就是自适应拓展类。TransporterAdaptive,也就是自适应拓展类。TransporterAdaptive 会在运行时根据传入的 URL 参数决定加载什么类型的 Transporter,默认为 NettyTransporter。

NettyTransporter 的 bind 方法

public Server bind(URL url, ChannelHandler listener) throws RemotingException {// 创建 NettyServerreturn new NettyServer(url, listener);
}
public class NettyServer extends AbstractServer implements Server {public NettyServer(URL url, ChannelHandler handler) throws RemotingException {// 调用父类构造方法super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));}
}public abstract class AbstractServer extends AbstractEndpoint implements Server {public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {// 调用父类构造方法,这里就不用跟进去了,没什么复杂逻辑super(url, handler);localAddress = getUrl().toInetSocketAddress();// 获取 ip 和端口String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {// 设置 ip 为 0.0.0.0bindIp = NetUtils.ANYHOST;}bindAddress = new InetSocketAddress(bindIp, bindPort);// 获取最大可接受连接数this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);try {// 调用模板方法 doOpen 启动服务器doOpen();} catch (Throwable t) {throw new RemotingException("Failed to bind ");}DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));}protected abstract void doOpen() throws Throwable;protected abstract void doClose() throws Throwable;
}

NettyServer.doOpen()

protected void doOpen() throws Throwable {NettyHelper.setNettyLoggerFactory();// 创建 boss 和 worker 线程池ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));// 创建 ServerBootstrapbootstrap = new ServerBootstrap(channelFactory);final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);channels = nettyHandler.getChannels();bootstrap.setOption("child.tcpNoDelay", true);// 设置 PipelineFactorybootstrap.setPipelineFactory(new ChannelPipelineFactory() {@Overridepublic ChannelPipeline getPipeline() {NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);ChannelPipeline pipeline = Channels.pipeline();pipeline.addLast("decoder", adapter.getDecoder());pipeline.addLast("encoder", adapter.getEncoder());pipeline.addLast("handler", nettyHandler);return pipeline;}});// 绑定到指定的 ip 和端口上channel = bootstrap.bind(getBindAddress());
}

服务注册

服务注册过程,服务注册操作对于 Dubbo 来说不是必需的,通过服务直连的方式就可以绕过注册中心。但通常我们不会这么做,直连方式不利于服务治理,仅推荐在测试服务时使用。对于 Dubbo 来说,注册中心虽不是必需,但却是必要的。
本节内容以 Zookeeper 注册中心作为分析目标,其他类型注册中心大家可自行分析。下面从服务注册的入口方法开始分析,我们把目光再次移到 RegistryProtocol 的 export 方法上

public  Exporter export(final Invoker originInvoker) throws RpcException {// ${导出服务}// 省略其他代码boolean register = registeredProviderUrl.getParameter("register", true);if (register) {// 注册服务register(registryUrl, registeredProviderUrl);ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);}final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);// 订阅 override 数据registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);// 省略部分代码
}

RegistryProtocol 的 export 方法包含了服务导出,注册,以及数据订阅等逻辑。其中服务导出逻辑上一节已经分析过了,本节将分析服务注册逻辑,相关代码如下:

public void register(URL registryUrl, URL registedProviderUrl) {// 获取 RegistryRegistry registry = registryFactory.getRegistry(registryUrl);// 注册服务registry.register(registedProviderUrl);
}

register 方法包含两步操作,第一步是获取注册中心实例,第二步是向注册中心注册服务。接下来分两节内容对这两步操作进行分析。

创建注册中心,getRegistry 方法

public Registry getRegistry(URL url) {url = url.setPath(RegistryService.class.getName()).addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()).removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);String key = url.toServiceString();LOCK.lock();try {// 访问缓存Registry registry = REGISTRIES.get(key);if (registry != null) {return registry;}// 缓存未命中,创建 Registry 实例registry = createRegistry(url);if (registry == null) {throw new IllegalStateException("Can not create registry...");}// 写入缓存REGISTRIES.put(key, registry);return registry;} finally {LOCK.unlock();}
}protected abstract Registry createRegistry(URL url);

getRegistry 方法先访问缓存,缓存未命中则调用 createRegistry 创建 Registry,然后写入缓存。这里的 createRegistry 是一个模板方法,由具体的子类实现。因此,下面我们到 ZookeeperRegistryFactory 中探究一番

public class ZookeeperRegistryFactory extends AbstractRegistryFactory {// zookeeperTransporter 由 SPI 在运行时注入,类型为 ZookeeperTransporter$Adaptiveprivate ZookeeperTransporter zookeeperTransporter;public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {this.zookeeperTransporter = zookeeperTransporter;}@Overridepublic Registry createRegistry(URL url) {// 创建 ZookeeperRegistryreturn new ZookeeperRegistry(url, zookeeperTransporter);}
}
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {super(url);if (url.isAnyHost()) {throw new IllegalStateException("registry address == null");}// 获取组名,默认为 dubboString group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);if (!group.startsWith(Constants.PATH_SEPARATOR)) {// group = "/" + groupgroup = Constants.PATH_SEPARATOR + group;}this.root = group;// 创建 Zookeeper 客户端,默认为 CuratorZookeeperTransporterzkClient = zookeeperTransporter.connect(url);// 添加状态监听器zkClient.addStateListener(new StateListener() {@Overridepublic void stateChanged(int state) {if (state == RECONNECTED) {try {recover();} catch (Exception e) {logger.error(e.getMessage(), e);}}}});
}

ZookeeperTransporter 的 connect 方法调用,这个方法用于创建 Zookeeper 客户端。创建好 Zookeeper 客户端,意味着注册中心的创建过程就结束了。接下来,再来分析一下 Zookeeper 客户端的创建过程。

前面说过,这里的 zookeeperTransporter 类型为自适应拓展类,因此 connect 方法会在被调用时决定加载什么类型的 ZookeeperTransporter 拓展,默认为 CuratorZookeeperTransporter。下面我们到 CuratorZookeeperTransporter 中看一看。

public ZookeeperClient connect(URL url) {// 创建 CuratorZookeeperClientreturn new CuratorZookeeperClient(url);
}
ublic class CuratorZookeeperClient extends AbstractZookeeperClient {private final CuratorFramework client;public CuratorZookeeperClient(URL url) {super(url);try {// 创建 CuratorFramework 构造器CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().connectString(url.getBackupAddress()).retryPolicy(new RetryNTimes(1, 1000)).connectionTimeoutMs(5000);String authority = url.getAuthority();if (authority != null && authority.length() > 0) {builder = builder.authorization("digest", authority.getBytes());}// 构建 CuratorFramework 实例client = builder.build();// 添加监听器client.getConnectionStateListenable().addListener(new ConnectionStateListener() {@Overridepublic void stateChanged(CuratorFramework client, ConnectionState state) {if (state == ConnectionState.LOST) {CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);} else if (state == ConnectionState.CONNECTED) {CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);} else if (state == ConnectionState.RECONNECTED) {CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);}}});// 启动客户端client.start();} catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);}}
}

CuratorZookeeperClient 构造方法主要用于创建和启动 CuratorFramework 实例

register()注册中心注册服务

上面已经将注册中心实例创建好了,接下来要做的事情是向注册中心注册服务。

public void register(URL url) {super.register(url);failedRegistered.remove(url);failedUnregistered.remove(url);try {// 模板方法,由子类实现doRegister(url);} catch (Exception e) {Throwable t = e;// 获取 check 参数,若 check = true 将会直接抛出异常boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)&& url.getParameter(Constants.CHECK_KEY, true)&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());boolean skipFailback = t instanceof SkipFailbackWrapperException;if (check || skipFailback) {if (skipFailback) {t = t.getCause();}throw new IllegalStateException("Failed to register");} else {logger.error("Failed to register");}// 记录注册失败的链接failedRegistered.add(url);}
}protected abstract void doRegister(URL url);

重点关注 doRegister 方法调用即可,其他的代码先忽略。doRegister 方法是一个模板方法,因此我们到 FailbackRegistry 子类 ZookeeperRegistry 中进行分析.

protected void doRegister(URL url) {try {// 通过 Zookeeper 客户端创建节点,节点路径由 toUrlPath 方法生成,路径格式如下://   /${group}/${serviceInterface}/providers/${url}// 比如//   /dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1......zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));} catch (Throwable e) {throw new RpcException("Failed to register...");}
}

ZookeeperRegistry 在 doRegister 中调用了 Zookeeper 客户端创建服务节点。节点路径由 toUrlPath 方法生成,该方法逻辑不难理解,就不分析了。接下来分析 create 方法

ublic void create(String path, boolean ephemeral) {if (!ephemeral) {// 如果要创建的节点类型非临时节点,那么这里要检测节点是否存在if (checkExists(path)) {return;}}int i = path.lastIndexOf('/');if (i > 0) {// 递归创建上一级路径create(path.substring(0, i), false);}// 根据 ephemeral 的值创建临时或持久节点if (ephemeral) {createEphemeral(path);} else {createPersistent(path);}
}
public void createEphemeral(String path) {try {// 通过 Curator 框架创建节点client.create().withMode(CreateMode.EPHEMERAL).forPath(path);} catch (NodeExistsException e) {} catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);}
}

到此关于服务注册的过程就分析完了。整个过程可简单总结为:先创建注册中心实例,之后再通过注册中心实例注册服务

订阅 override 数据

// 待补充

参考博客地址

dubbo官方文档 服务导出
dubbo进阶 服务暴露

相关内容

热门资讯

监控摄像头接入GB28181平... 流程简介将监控摄像头的视频在网站和APP中直播,要解决的几个问题是:1&...
Windows10添加群晖磁盘... 在使用群晖NAS时,我们需要通过本地映射的方式把NAS映射成本地的一块磁盘使用。 通过...
protocol buffer... 目录 目录 什么是protocol buffer 1.protobuf 1.1安装  1.2使用...
在Word、WPS中插入AxM... 引言 我最近需要写一些文章,在排版时发现AxMath插入的公式竟然会导致行间距异常&#...
【PdgCntEditor】解... 一、问题背景 大部分的图书对应的PDF,目录中的页码并非PDF中直接索引的页码...
修复 爱普生 EPSON L4... L4151 L4153 L4156 L4158 L4163 L4165 L4166 L4168 L4...
Fluent中创建监测点 1 概述某些仿真问题,需要创建监测点,用于获取空间定点的数据࿰...
educoder数据结构与算法...                                                   ...
MySQL下载和安装(Wind... 前言:刚换了一台电脑,里面所有东西都需要重新配置,习惯了所...
MFC文件操作  MFC提供了一个文件操作的基类CFile,这个类提供了一个没有缓存的二进制格式的磁盘...