首先看来自官方文档
-
这里的Invoker是Provider的一个可调用Service的抽象,Invoker封装了Provider地址及Service接口信息。
-
Directory代表多个Invoker,可以把它看成List<Invoker>,但与List不同的是,它的值可能是动态变化的,比如注册中心推送变更。
-
Cluster将Directory中的多个Invoker伪装成一个Invoker,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个。
-
Router负责从多个Invoker中按路由规则选出子集,比如读写分离,应用隔离等。
-
LoadBalance负责从多个Invoker中选出具体的一个用于本次调用,选的过程包含了负载均衡算法,调用失败后,需要重选。
以前自己看到上面的文字,理解不太深。随着对分布式系统的深入接触,以及DUBBO源码的研究,才有了更精确的理解。总结一句话:阅读技术类文档时,对一些语言的理解,受到阅读者自身情况的限制,往往理解有深有浅。闲话少说。本文,就是针对图中的组件,一个个进行剖析。
1.Invoker
Invoker 是Provider的一个可调用Service的抽象,Invoker封装了Provider地址及Service接口信息。
1.1 API
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | public interface Invoker<T> extends Node { /** * get service interface. * @return service interface. */ Class<T> getInterface(); /** * invoke. * @param invocation * @return result * @throws RpcException */ Result invoke(Invocation invocation) throws RpcException; } |
1.2 层次树结构
分析可知:主要有2个分支
1.AbstractInvoker:主要具体远程实现,和RPC 协议有关,属于dubbo-rpc-api范畴。
2.AbstractClusterInvoker:主要逻辑包括Invoker的选择,和高可用有关,属于dubbo-cluster范畴。
1.3 AbstractInvoker VS AbstractClusterInvoker
1.3.1 类图比较
通过下图可以很容易发现:AbstractClusterInvoker比较AbstractInvoker,多了些select,doselect,reselect方法。这些方法的作用也可以猜到。
1.3.2 以两个实现类对比
选择AbstractInvoker的子类DubboInvoker,
和AbstractClusterInvoker的子类FailoverClusterInvoker为代表。
其中FailoverClusterInvoker是dubbo集群容错默认方案,Dubbo协议为默认协议。
先看下AbstractInvoker分支
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 | public abstract class AbstractInvoker<T> implements Invoker<T> { public Result invoke(Invocation inv) throws RpcException { RpcInvocation invocation = (RpcInvocation) inv; invocation.setInvoker( this ); Map<String, String> context = RpcContext.getContext().getAttachments(); try { return doInvoke(invocation); } catch (InvocationTargetException e) { // biz exception } catch (RpcException e) { } catch (Throwable e) { } } protected abstract Result doInvoke(Invocation invocation) throws Throwable; .. } public class DubboInvoker<T> extends AbstractInvoker<T> { @Override protected Result doInvoke( final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); logger.debug( "doInvoke start -----------------------------" ); ExchangeClient currentClient; if (clients.length == 1 ) { currentClient = clients[ 0 ]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false ); currentClient.send(inv, isSent); RpcContext.getContext().setFuture( null ); return new RpcResult(); } else if (isAsync) { ResponseFuture future = currentClient.request(inv, timeout) ; RpcContext.getContext().setFuture( new FutureAdapter<Object>(future)); return new RpcResult(); } else { RpcContext.getContext().setFuture( null ); return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } finally { logger.debug( "doInvoke end-----------------------------" ); } } ... } |
这里可以很清晰地看到,远程调用分三种情况:
1. 不需要返回值的调用(所谓oneWay)
2. 异步(async)
3. 同步
对于第一种情况,客户端只管发请求就完了,不考虑返回结果。
对于第二种情况,客户端除了发请求,还需要将结果塞到一个ThreadLocal变量中,以便于客户端get返回值
对于第三种情况,客户端除了发请求,还会同步等待返回结果
再看下AbstractClusterInvoker分支
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 | public abstract class AbstractClusterInvoker<T> implements Invoker<T> { public Result invoke( final Invocation invocation) throws RpcException { checkWheatherDestoried(); LoadBalance loadbalance; List<Invoker<T>> invokers = list(invocation); if (invokers != null && invokers.size() > 0 ) { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance. class ).getExtension(invokers.get( 0 ).getUrl() .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } else { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance. class ).getExtension(Constants.DEFAULT_LOADBALANCE); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); } protected List<Invoker<T>> list(Invocation invocation) throws RpcException { List<Invoker<T>> invokers = directory.list(invocation); return invokers; } protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException; .. } public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> { public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyinvokers = invokers; checkInvokers(copyinvokers, invocation); int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1 ; if (len <= 0 ) { len = 1 ; } // retry loop. RpcException le = null ; // last exception. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers. Set<String> providers = new HashSet<String>(len); for ( int i = 0 ; i < len; i++) { //重试时,进行重新选择,避免重试时invoker列表已发生变化. //注意:如果列表发生了变化,那么invoked判断会失效,因为invoker示例已经改变 if (i > 0 ) { checkWheatherDestoried(); copyinvokers = list(invocation); //重新检查一下 checkInvokers(copyinvokers, invocation); } Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List)invoked); try { Result result = invoker.invoke(invocation); return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } } ... } |
总结下AbstractClusterInvoker和AbstractClusterInvoker都需要实现Invoker接口,两者都声明了doInvoke抽象方法。但方法定义有区别。
1 2 3 4 | protected abstract Result doInvoke(Invocation invocation) throws Throwable; protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException; |
AbstractClusterInvoker 需要一个Invoker列表,他们来自Directory,LoadBalance 可以可以理解为负载均衡策略。
两者的联系:AbstractClusterInvoker 比AbstractInvoker多了一些选择和负载均衡部分,到最后还是会调用AbstractInvoker分支,负责具体RPC调用工作。
1.4 常见容错方案对比
Feature | 优点 | 缺点 | 实现类 |
Failover Cluster | 失败自动切换,当出现失败,重试其它服务器,通常用于读操作(推荐使用) | 重试会带来更长延迟 | FailoverClusterInvoker |
Failfast Cluster | 快速失败,只发起一次调用,失败立即报错,通常用于非幂等性的写操作 | 如果有机器正在重启,可能会出现调用失败 | FailfastClusterInvoker |
Failsafe Cluster | 失败安全,出现异常时,直接忽略,通常用于写入审计日志等操作 | 调用信息丢失 | FailsafeClusterInvoker |
Failback Cluster | 失败自动恢复,后台记录失败请求,定时重发,通常用于消息通知操作 | 不可靠,重启丢失 | FailbackClusterInvoker |
Forking Cluster | 并行调用多个服务器,只要一个成功即返回,通常用于实时性要求较高的读操作 | 需要浪费更多服务资源 | ForkingClusterInvoker |
Broadcast Cluster | 广播调用所有提供者,逐个调用,任意一台报错则报错,通常用于更新提供方本地状态 | 速度慢,任意一台报错则报错 | BroadcastClusterInvoker |
2. 总结
*ClusterInvoker 分别使用Router,和Directory获取Invoker列表。
*ClusterInvoker然后再Invoker列表中,借助LoadBalance提供的负载均衡策略,返回一个可用的Invoker.
*Directory代表多个Invoker,可以理解为Invoker的逻辑集合,负责Invoker的下线,上线。
Router和Directory两者都对我提供Invoker列表。通过提供的API,很容易找出区别来。
1 2 3 4 | //Directory List<Invoker<T>> list(Invocation invocation) throws RpcException; //Route <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException; |
Directory:返回的Invoker列表,代表当前正常对外提供服务的Invoker列表。重点在维护可用节点列表。
Route:返回的Invoker列表,是在现有可用的Invoker列表里,按照规则进行再筛选,重点在规则匹配,过滤等。