Dubbo 中的集群容错

Dubbo 中的集群容错

前言

在微服务架构中,服务间的依赖关系复杂且动态,任何一个服务的故障都可能引发连锁反应,导致系统雪崩。一个好的容错设计可以避免这些问题发生:

  • 服务雪崩效应:单个服务崩溃或响应延迟可能导致调用链上的所有服务被阻塞,最终拖垮整个系统。例如,若服务 A 依赖服务 B,而服务 B 因高负载无法响应,A 的线程池可能被占满,进而影响其他依赖A的服务;

  • 分布式系统的脆弱性:网络抖动、节点宕机、资源耗尽等问题在分布式环境中不可避免。容错机制通过冗余和快速失败策略,确保部分故障不会扩散到整个系统;

  • 服务的可用性低:微服务的目标是提升系统可用性,而容错设计(如故障转移、熔断)是保障服务持续可用的核心手段。例如,通过自动切换健康节点,避免单点故障。

Dubbo 的集群容错机制

在 Dubbo 中,多个 Provider 实例构成一个「集群」。消费者调用时,Dubbo 通过 Cluster 模块实现容错策略的封装和路由,Cluster 模块会根据配置(如 cluster=failover)装配不同的容错策略实现类,对 Directory 中的多个 Invoker 进行处理,返回一个可执行的 Invoker。Dubbo 当前已支持以下 6 种容错策略(在 org.apache.dubbo.rpc.cluster.support 包下):

策略简称 实现类名 特性 使用场景
Failover FailoverClusterInvoker 失败自动重试,默认实现 网络不稳定,民登操作
Failfast FailfastClusterInvoker 快速失败,不重试 响应时间敏感,非幂等
Failsafe FailsafeClusterInvoker 失败忽略异常 日志记录、监控等非主要场景
Failback FailbackClusterInvoker 失败后后台重试 可容忍失败,后续补偿重试
Forking ForkingClusterInvoker 并行调用多个节点,最快成功返回 实时性要求高,资源充足
Broadcast BroadcastClusterInvoker 广播方式调用所有服务提供着 配置更新、通知类等操作

Failover Cluster(失败自动切换,默认策略)

实现原理:通过循环重试实现容错。
实现源码关键点:

  1. FailoverClusterInvoker 的 doInvoke 方法中,通过 for 循环控制重试次数(默认重试 2 次,共调用 3 次);
  2. 每次重试前调用 list(invocation) 重新获取最新的 Invoker 列表,确保动态感知节点变化。
// 代码片段:org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker#doInvoke for (int i = 0; i < len; i++) {     if (i > 0) {         copyInvokers = list(invocation); // 动态刷新 Invoker 列表     }     Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);     // 调用并处理异常... } 

Failfast Cluster(快速失败)

实现原理:仅发起一次调用,异常直接抛出。
实现源码关键点:

  1. FailfastClusterInvoker 直接调用目标 Invoker,不进行重试。
// 代码片段:org.apache.dubbo.rpc.cluster.support.FailfastClusterInvoker#doInvoke fpublic Result doInvoke(...) throws RpcException {     checkInvokers(invokers, invocation);     Invoker<T> invoker = select(loadbalance, invocation, invokers, null);     return invoker.invoke(invocation); // 仅一次调用 } 

Failsafe Cluster(失败安全)

实现原理:异常被捕获后返回空结果,不中断流程。
实现源码关键点:

  1. ailsafeClusterInvoker通过try-catch捕获异常并记录日志。
// 代码片段:org.apache.dubbo.rpc.cluster.support.FailsafeClusterInvoker try {     // 调用逻辑... } catch (Throwable e) {     logger.error("Failsafe ignore exception", e);     return new RpcResult(); // 返回空结果 } 

Failback Cluster(失败自动恢复)

实现原理:失败请求存入队列,定时重试。
实现源码关键点:

  1. 捕获失败异常,使用 RetryTimerTask 存储失败请求,定时触发重试。
// 代码片段:org.apache.dubbo.rpc.cluster.support.FailbackClusterInvoker#doInvoke private void addFailed(         LoadBalance loadbalance,         Invocation invocation,         List<Invoker<T>> invokers,         Invoker<T> lastInvoker,         URL consumerUrl) {     if (failTimer == null) {         synchronized (this) {             if (failTimer == null) {                 failTimer = new HashedWheelTimer(                         new NamedThreadFactory("failback-cluster-timer", true),                         1,                         TimeUnit.SECONDS,                         32,                         failbackTasks);             }         }     }     RetryTimerTask retryTimerTask = new RetryTimerTask(             loadbalance, invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD, consumerUrl);     try {         failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, TimeUnit.SECONDS);     } catch (Throwable e) {         logger.error(                 CLUSTER_TIMER_RETRY_FAILED,                 "add newTimeout exception",                 "",                 "Failback background works error, invocation->" + invocation + ", exception: " + e.getMessage(),                 e);     } } 

Forking Cluster(并行调用)

实现原理:并发调用多个节点,首个成功结果即返回。
实现源码关键点:

  1. 使用线程池并发调用,结果通过 BlockingQueue 异步接收。
// 代码片段:org.apache.dubbo.rpc.cluster.support.ForkingClusterInvoker#doInvoke for (Invoker<T> invoker : selected) {     executor.execute(() -> {         Result result = invoker.invoke(invocation);         ref.offer(result); // 结果存入队列     }); } 

Broadcast Cluster(广播调用)

实现原理:逐个调用所有节点,任一失败则整体失败。
实现源码关键点:

  1. 遍历所有 Invoker 调用,异常累积后抛出。
// 代码片段:org.apache.dubbo.rpc.cluster.support.BroadcastClusterInvoker#doInvoke for (Invoker<T> invoker : invokers) {     try {         invoker.invoke(invocation);     } catch (RpcException e) {         exception = e;     } } if (exception != null) throw exception; 

如何自定义集群容错策略

如果以上提供的容错策略不满足需求,Dubbo 支持通过 SPI 自定义 Cluster 实现,步骤如下:

第一步:实现 Cluster 和 AbstractClusterInvoker
@SPI("custom") public class MyCluster implements Cluster {      @Override     public <T> Invoker<T> join(Directory<T> directory) {         return new MyClusterInvoker<>(directory);     }  } 
public class MyClusterInvoker<T> extends AbstractClusterInvoker<T> {      @Override     protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) {         // 自定义逻辑,例如条件重试、动态路由等     }  } 
第二步:添加 SPI 配置

META-INF/dubbo/org.apache.dubbo.rpc.cluster.Cluster 中添加配置:

mycluster=com.example.MyCluster 
第三步:配置使用自定义容错策略
<dubbo:reference cluster="mycluster" /> 

总结

建议核心服务优先使用 Failover(失败自动切换) 策略保障可用性,非核心服务可降级为 Failsafe(失败安全)。同时结合 Hystrix(已停止更新)Sentinel 实现熔断与限流,增强容错能力。

通过灵活组合 Dubbo 的容错策略,可显著提升分布式系统的鲁棒性。实际应用配置时需要根据业务特性权衡延迟、资源开销与一致性要求,一切皆是 trade off ~

P.S. 不妨再深入思考一下:Dubbo 的集群容错实现中有哪些优秀设计值得我们学习?

发表评论

您必须 [ 登录 ] 才能发表留言!

相关文章