服务容错机制

负载均衡可以实现在服务器之间均衡分配负载,而服务容错的作用是如果某台服务器出现故障或宕机,

服务容错机制可以自动将流量或请求转发给其他正常运行的服务器,从而确保服务的连续性和可用性。

这种容错机制可以最大程度地减少单点故障对系统造成的影响,提供更好的容错性和高可用性。

对于服务容错机制,我们不仅提供了故障转移模式而且还提供了失败重试模式,默认使用故障转移模式。

故障转移模式

故障转移模式就是当集群中的某个节点出现故障时就会遍历其他节点,直到有一个节点能正常请求,如果所有节点都发生了故障,那么此时就会抛出异常

@Slf4j
public class FailoverClusterInvoker extends AbstractClusterInvoker {

    public FailoverClusterInvoker(ServiceDiscovery serviceDiscovery, ClientConfiguration configuration, LoadBalance loadBalance, RemotingInvoker remotingInvoker) {
        super(serviceDiscovery, configuration, loadBalance, remotingInvoker);
    }

    @Override
    protected MessageResponseBody doInvoke(Message message, List<ServerInformation> servers) throws RpcException {
        ServerInformation serverInformation = super.select(message, servers);
        MessageResponseBody response;
        try {
            response = remotingInvoker.invoke(message, serverInformation);
        } catch (RpcException e){
            servers.remove(serverInformation);
            if (CollectionUtils.isEmpty(servers)){
                throw new FailoverException(serverInformation.getServerId(), e.getMessage());
            }
            response = invoke(message, servers);
        }
        return response;
    }

    @Override
    protected void doInvokeAsync(Message message, List<ServerInformation> servers, CallCallback callback) throws RpcException {
        ServerInformation serverInformation = super.select(message, servers);
        try {
            remotingInvoker.invokeAsync(message, serverInformation, callback);
        } catch (RpcException e){
            servers.remove(serverInformation);
            if (CollectionUtils.isEmpty(servers)){
                throw new FailoverException(serverInformation.getServerId(), e.getMessage());
            }
            invokeAsync(message, servers, callback);
        }
    }

    private MessageResponseBody invoke(Message message, List<ServerInformation> servers) throws RpcException{
        RpcException ex = null;
        MessageResponseBody response = null;
        for (ServerInformation serverInformation : servers) {
            try {
                response = remotingInvoker.invoke(message, serverInformation);
                break;
            }catch (RpcException e){
                ex = e;
            }
        }
        if (response == null && ex != null){
            throw new FailoverException(servers.get(servers.size() -1).getServerId(), ex.getMessage());
        }
        return response;
    }

    private void invokeAsync(Message message, List<ServerInformation> servers, CallCallback callback) throws RpcException{
        RpcException ex = null;
        for (ServerInformation serverInformation : servers) {
            try {
                remotingInvoker.invokeAsync(message, serverInformation, callback);
                return;
            }catch (RpcException e){
                ex = e;
            }
        }
        if (ex != null){
            throw new FailoverException(servers.get(servers.size() -1).getServerId(), ex.getMessage());
        }
    }
}

失败重试模式

失败重试模式就是当请求失败时开始重试,如果达到最大重试次数则抛出异常

@Slf4j
public class FailbackClusterInvoker extends AbstractClusterInvoker {

    public FailbackClusterInvoker(ServiceDiscovery serviceDiscovery, ClientConfiguration configuration, LoadBalance loadBalance, RemotingInvoker remotingInvoker) {
        super(serviceDiscovery, configuration, loadBalance, remotingInvoker);
    }

    @Override
    protected MessageResponseBody doInvoke(Message message, List<ServerInformation> servers) throws RpcException {
        ServerInformation serverInformation = super.select(message, servers);
        boolean success = false;
        int maxTimes = configuration.getRetryTimes();
        int currentTimes = 0;
        MessageResponseBody response = null;
        while (!success) {
            try {
                response = remotingInvoker.invoke(message, serverInformation);
                success = true;
            }catch (RpcException e){
                log.error(e.getMessage(), e);
            }
            if (!success) {
                currentTimes++;
                if (currentTimes > maxTimes) {
                    throw new FailbackException(serverInformation.getServerId(),
                            "The number of invoke retries reaches the upper limit, " +
                            "the maximum number of times:" + maxTimes);
                }
                try {
                    Thread.sleep(configuration.getRetryIntervalMilliSeconds());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        return response;
    }

    @Override
    protected void doInvokeAsync(Message message, List<ServerInformation> servers, CallCallback callback) throws RpcException {
        ServerInformation serverInformation = super.select(message, servers);
        boolean success = false;
        int maxTimes = configuration.getRetryTimes();
        int currentTimes = 0;
        while (!success) {
            try {
                remotingInvoker.invokeAsync(message, serverInformation, callback);
                success = true;
            }catch (RpcException e){
                log.error(e.getMessage(), e);
            }
            if (!success) {
                currentTimes++;
                if (currentTimes > maxTimes) {
                    throw new FailbackException(serverInformation.getServerId(),
                            "The number of invoke retries reaches the upper limit, " +
                                    "the maximum number of times:" + maxTimes);
                }
                try {
                    Thread.sleep(configuration.getRetryIntervalMilliSeconds());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}