开篇
这篇文章的目的主要是为了讲解下Dubbo中路由策略功能,核心问题包括路由的更新流程和生效流程,当然这些流程都是针对interface服务级别的。
路由生成流程
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener { public synchronized void notify(List<URL> urls) { Map<String, List<URL>> categoryUrls = urls.stream() .filter( s::nonNull) .filter(this::isValidCategory) .filter(this::isNotCompatibleFor26x) .collect(Collectors.groupingBy(url -> { if (UrlUtils.isConfigurator(url)) { return CONFIGURATORS_CATEGORY; } else if (UrlUtils.isRoute(url)) { return ROUTERS_CATEGORY; } else if (UrlUtils.isProvider(url)) { return PROVIDERS_CATEGORY; } return ""; })); List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList()); this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators); List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList()); // 将路由的url信息进行转换并添加路由信息当中 toRouters(routerURLs).ifPresent(this::addRouters); // providers List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList()); refreshOverrideAndInvoker(providerURLs); } private Optional<List<Router>> toRouters(List<URL> urls) { if (urls == null || urls.isEmpty()) { return Optional.empty(); } List<Router> routers = new ArrayList<>(); for (URL url : urls) { if (EMPTY_PROTOCOL.equals(url.getProtocol())) { continue; } String routerType = url.getParameter(ROUTER_KEY); if (routerType != null && routerType.length() > 0) { url = url.setProtocol(routerType); } try { // 根据路由策略走SPI获取走不同策略的路由生成 Router router = ROUTER_FACTORY.getRouter(url); if (!routers.contains(router)) { routers.add(router); } } catch (Throwable t) { logger.error("convert router url to router error, url: " + url, t); } } return Optional.of(routers); }}- dubbo的consumer在引用provider对应服务的时候会监听服务对应注册中心上的configurators、providers、routers三个目录。
- 任意一个目录发生变更的时候都会进入notify()阶段,routers的变更就在这个时候会发生。
- toRouters(routerURLs).ifPresent(this::addRouters)将路由的URL转换为Router对象并添加到Routers。
- toRouters()方法里url.setProtocol(routerType)将routerType作为protocol字段注入到URL当中,routerType字段对应路由当中的router字段(如router=condition)。
file=com.alibaba.dubbo.rpc.cluster.router.file.FileRouterFactory =com.alibaba.dubbo.rpc.cluster.router. . RouterFactorycondition=com.alibaba.dubbo.rpc.cluster.router.condition.ConditionRouterFactory- RouterFactory在在com.alibaba.dubbo.rpc.cluster.RouterFactory中定义。
public class RouterFactory$Adaptiveimplements RouterFactory { public Router getRouter(URL uRL) { if (uRL == null) { throw new IllegalArgumentException("url == null"); } URL uRL2 = uRL; String string = uRL2.getProtocol(); if (string == null) { throw new IllegalStateException(new StringBuffer().append("Failed to get extension (org.apache.dubbo.rpc.cluster.RouterFactory) name from url (").append(uRL2.toString()).append(") use keys([protocol])").toString()); } RouterFactory routerFactory = (RouterFactory)Extensi er.getExtensi er(RouterFactory.class).getExtension(string); return routerFactory.getRouter(uRL); }}route://0.0.0.0/org.apache.dubbo.demo.DemoService?category=routers&compatible_config=true&dynamic=false&enabled=true&force=true&name=null&priority=0&router=condition&rule= => host != 172.22.3.91&runtime=false&version=20880- ROUTER_FACTORY.getRouter(url)根据url中参数选择具体的RouterFactory。
- url.setProtocol(routerType)将routerType作为protocol字段注入到URL当中,这里routerType为router=condition。
- ROUTER_FACTORY对应的为ConditionRouterFactory。
- 通过routerFactory.getRouter(uRL)生成路由信息。
路由添加流程
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {}public abstract class AbstractDirectory<T> implements Directory<T> { protected RouterChain<T> routerChain; protected void addRouters(List<Router> routers) { routers = routers == null ? Collections.emptyList() : routers; // 添加路由到路由联调 routerChain.addRouters(routers); }}public class RouterChain<T> { public void addRouters(List<Router> routers) { List<Router> newRouters = new ArrayList<>(); // 添加内置路由 newRouters.addAll(builtinRouters); // 添加动态配置路由 newRouters.addAll(routers); // 按照路由优先级排序 CollectionUtils.sort(newRouters); // 最终生成的有序路由规则 this.routers = newRouters; }}public interface Router extends Comparable<Router> { int DEFAULT_PRIORITY = Integer.MAX_VALUE; @Override default int compareTo(Router o) { if (o == null) { throw new IllegalArgumentException(); } return Integer.compare(this.getPriority(), o.getPriority()); }}- RegistryDirectory的notify()方法内部通过使用下面方法生成路由并添加路由toRouters(routerURLs).ifPresent(this::addRouters)。
- addRouters的实现在RegistryDirectory的父类AbstractDirectory当中。
- routerChain.addRouters(routers)内部通过添加内置路由规则、动态配置路由规则、排序整体路由规则,最终生成最终路由规则。
- 路由规则的比较顺序是按照优先级字段Priority来排序。
路由选择过程
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener { private void refreshInvoker(List<URL> invokerUrls) { Assert.notNull(invokerUrls, "invokerUrls should not be null"); if (invokerUrls.size() == 1 && invokerUrls.get(0) != null && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) { this.forbidden = true; // Forbid to access this.invokers = Collections.emptyList(); // 注入provider的invokers routerChain.setInvokers(this.invokers); destroyAllInvokers(); // Close all invokers } else { // 省略无关代码 // 注入provider的invokers routerChain.setInvokers(newInvokers); this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers; this.urlInvokerMap = newUrlInvokerMap; try { destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker } catch (Exception e) { logger.warn("destroyUnusedInvokers error. ", e); } } }}- provider侧变更会触发更新routerChain的invoker(服务提供者)。
- routerChain.setInvokers(this.invokers)负责更新routerChain的服务提供者。
- routerChain包含最新invokers,每次进行路由选择的时候直接访问routerChain内部的invoker即可,起到缓存作用。
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {}public abstract class AbstractClusterInvoker<T> implements Invoker<T> { public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed(); // binding attachments into invocation. Map<String, String> contextAttachments = RpcContext.getContext().getAttachments(); if (contextAttachments != null && contextAttachments.size() != 0) { ((RpcInvocation) invocation).addAttachments(contextAttachments); } // 获取执行的invoker列表 List<Invoker<T>> invokers = list(invocation); LoadBalance loadbalance = initLoadBalance(invokers, invocation); RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); } protected List<Invoker<T>> list(Invocation invocation) throws RpcException { return directory.list(invocation); }}- 在cluster模式下以FailoverClusterInvoker为例,每次执行invoke()会最终执行directory.list()获取invokers。
public abstract class AbstractDirectory<T> implements Directory<T> { public List<Invoker<T>> list(Invocation invocation) throws RpcException { return doList(invocation); }}public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener { public List<Invoker<T>> doList(Invocation invocation) { // 省略相关代码 List<Invoker<T>> invokers = null; try { // Get invokers from cache, only runtime routers will be executed. // 通过路由链去获取符合路由规则的invoker对象 invokers = routerChain.route(getConsumerUrl(), invocation); } catch (Throwable t) { } return invokers == null ? Collections.emptyList() : invokers; }}- directory.list()最终会执行routerChain.route()进行路由选择。
public class RouterChain<T> { private List<Invoker<T>> invokers = Collections.emptyList(); public List<Invoker<T>> route(URL url, Invocation invocation) { List<Invoker<T>> finalInvokers = invokers; for (Router router : routers) { finalInvokers = router.route(finalInvokers, url, invocation); } return finalInvokers; }}- routerChain.route()会针对invokers遍历所有的路由策略并返回符合要求的finalInvokers。
- routerChain本身维护了某服务下的所有invokers,通过路由过滤找到符合要求的finalInvokers返回。
继续阅读与本文标签相同的文章
-
带你读《深入理解AutoML和AutoDL:构建自动化机器 学习与深度学习平台》之一:人工智能概述
2026-05-16栏目: 教程
-
带你读《深入理解AutoML和AutoDL:构建自动化机器 学习与深度学习平台》之三:机器学习概述
2026-05-16栏目: 教程
-
带你读《深入理解AutoML和AutoDL:构建自动化机器 学习与深度学习平台》之二:自动化人工智能
2026-05-16栏目: 教程
-
深度融合,POLARDB与SuperMap联合构建首个云原生时空平台
2026-05-16栏目: 教程
-
带你读《Java图像处理:基于OpenCV与JVM》之一:基于JavaVM的OpenCV
2026-05-16栏目: 教程
