深入探索

这里你将了解到我们基于 Netty 和 Grpc 的实现的同步和异步通信方式,下面是具体的代码。

同步调用

同步调用的优点如下:

  1. 简单直观,同步调用是一种顺序执行的方式,代码编写和理解相对容易,
  2. 错误处理方便,在同步调用中,可以通过异常处理机制来捕获和处理错误。

同步调用的缺点如下:

  1. 会阻塞等待,在同步调用中,当一个调用被执行时,程序会一直等待结果返回,期间无法执行其他任务,可能导致性能下降。

同步调用适用场景:同步调用适用于非频繁、执行时间不长的调用场景。

基于 Netty 实现

@Slf4j
public class NettyRemotingInvoker implements RemotingInvoker {

    private final RpcClient rpcClient;
    private final RequestInterceptor requestInterceptor;
    private final RequestIdGenerator requestIdGenerator;

    public NettyRemotingInvoker(RpcClient rpcClient, RequestInterceptor requestInterceptor, RequestIdGenerator requestIdGenerator) {
        this.rpcClient = rpcClient;
        this.requestInterceptor = requestInterceptor;
        this.requestIdGenerator = requestIdGenerator;
    }

    @Override
    public MessageResponseBody invoke(Message message, ServerInformation serverInformation) throws RpcException {
        Channel channel = NettyClientChannelManager.establishChannel((NettyClient) rpcClient, serverInformation);
        String serverId = serverInformation.getServerId();
        final String random = requestIdGenerator.generate();
        MessageRequestBody requestBody = new MessageRequestBody().setServerId(serverId).setMessage(message).setRequestId(random);
        String requestJsonBody = JSON.toJSON(requestBody);
        MessageRequest messageRequest =  MessageRequest.newBuilder().setBody(requestJsonBody).build();
        requestInterceptor.intercept(requestBody);
        CompletableFuture<MessageResponseBody> completableFuture = new CompletableFuture<>();
        NettyUnprocessedRequests.put(random, completableFuture);
        try {
            channel.writeAndFlush(messageRequest).addListener((ChannelFutureListener) future -> {
                if (!future.isSuccess()) {
                    completableFuture.completeExceptionally(future.cause());
                }
            });
            return completableFuture.get();
        } catch (Exception e) {
            handlerException(serverId, channel, e);
            String msg = String.format("To the Server: %s, exception when sending a message, cause by: %s", serverId, e.getMessage());
            throw new RemoteInvokeException(serverId, msg);
        }
    }

    // ~

    private void handlerException(String serverId, Channel channel, Exception e){
        log.error("To the server {}, occur exception {}", serverId, e.getMessage());
        if (!channel.isActive() && !channel.isOpen() && !channel.isWritable()) {
            channel.close();
            NettyClientChannelManager.removeChannel(serverId);
            log.error("The Server is unavailable, shutdown channel and the cached channel is deleted.");
        }
    }
}

基于 Grpc 实现

@Slf4j
public class GrpcRemotingInvoker implements RemotingInvoker {

    private final RpcClient rpcClient;
    private final RequestInterceptor requestInterceptor;
    private final RequestIdGenerator requestIdGenerator;

    public GrpcRemotingInvoker(RpcClient rpcClient, RequestInterceptor requestInterceptor, RequestIdGenerator requestIdGenerator) {
        this.rpcClient = rpcClient;
        this.requestInterceptor = requestInterceptor;
        this.requestIdGenerator = requestIdGenerator;
    }

    @Override
    public MessageResponseBody invoke(Message message, ServerInformation serverInformation) throws RpcException {
        String serverId = serverInformation.getServerId();
        ManagedChannel channel = GrpcClientChannelManager.establishChannel((GrpcClient) rpcClient, serverInformation);
        MessageServiceGrpc.MessageServiceBlockingStub messageClientStub = MessageServiceGrpc.newBlockingStub(channel);
        final String random = requestIdGenerator.generate();
        MessageRequestBody requestBody = new MessageRequestBody().setServerId(serverId).setMessage(message).setRequestId(random);
        MessageRequest messageRequest = MessageRequest.newBuilder().setBody(JSON.toJSON(requestBody)).build();
        requestInterceptor.intercept(requestBody);

        try {
            MessageResponse response = messageClientStub.messageProcessing(messageRequest);
            return JSON.parse(response.getBody(), MessageResponseBody.class);
        } catch (StatusRuntimeException e) {
            Status.Code code = e.getStatus().getCode();
            handlerException(serverId, channel, code);
            throw new RemoteInvokeException(serverId, e.getMessage());
        } catch (Exception e) {
            channel.shutdown();
            String msg = String.format("To the Server: %s, exception when sending a message, cause by: %s", serverId, e.getMessage());
            throw new RemoteInvokeException(serverId, msg);
        }
    }

    // ~

    private void handlerException(String serverId, ManagedChannel channel, Status.Code code){
        log.error("To the Server: {}, exception when sending a message, Status Code: {}", serverId, code);
        if (Status.Code.UNAVAILABLE == code) {
            channel.shutdown();
            GrpcClientChannelManager.removeChannel(serverId);
            log.error("The Server is unavailable, shutdown channel and the cached channel is deleted.");
        }
    }
}

异步调用

异步调用的优点如下:

  1. 非阻塞并发:在异步调用中,调用者可以发起多个调用,而无需等待结果返回,从而提高并发性能。

  2. 响应时间快:异步调用可以在任务执行的过程中继续处理其他任务,不会阻塞主线程,因此响应时间较短。

异步调用的缺点如下:

  1. 对于需要返回调用结果处理处理相对复杂,可能需要改为通知

异步调用适用场景:需要同时处理多个任务,并发性能要求较高的调用场景,而且调用结果不需要立即使用,可以在后台进行处理。

基于 Netty 实现

使用 Netty 发送异步请求,其实关键就是在 writeAndFlush 之后当 channelRead0 收到服务端响应时如何拿到结果

这里有两种方式:

  1. 通过 channel.attr 实现,代码可看项目提交记录

  2. 通过 CompletableFuture 实现,代码如下,完整代码可看源码

@Slf4j
public class NettyRemotingInvoker implements RemotingInvoker {

    private final RpcClient rpcClient;
    private final RequestInterceptor requestInterceptor;
    private final RequestIdGenerator requestIdGenerator;

    public NettyRemotingInvoker(RpcClient rpcClient, RequestInterceptor requestInterceptor, RequestIdGenerator requestIdGenerator) {
        this.rpcClient = rpcClient;
        this.requestInterceptor = requestInterceptor;
        this.requestIdGenerator = requestIdGenerator;
    }

    //~

    @Override
    public void invokeAsync(Message message, ServerInformation serverInformation, CallCallback callback) throws RpcException {
        Channel channel = NettyClientChannelManager.establishChannel((NettyClient) rpcClient, serverInformation);
        String serverId = serverInformation.getServerId();
        final String random = requestIdGenerator.generate();
        MessageRequestBody requestBody = new MessageRequestBody().setServerId(serverId).setMessage(message).setRequestId(random);
        MessageRequest messageRequest =  MessageRequest.newBuilder().setBody(JSON.toJSON(requestBody)).build();
        requestInterceptor.intercept(requestBody);
        CompletableFuture<MessageResponseBody> completableFuture = new CompletableFuture<>();
        NettyUnprocessedRequests.put(random, completableFuture);
        try {
            channel.writeAndFlush(messageRequest).addListener((ChannelFutureListener) channelFuture -> {
                if (channelFuture.isSuccess()) {
                    completableFuture.whenComplete((responseBody, throwable) -> {
                        if (throwable == null){
                            callback.onCompleted(responseBody);
                        }
                    }).exceptionally(e->{
                        log.error(e.getMessage(), e);
                        throw new RemoteInvokeException(serverId, e.getMessage());
                    });
                } else {
                    completableFuture.completeExceptionally(channelFuture.cause());
                }
            });
        } catch (Exception e) {
            handlerException(serverId, channel, e);
            String msg = String.format("To the Server: %s, exception when sending a message, cause by: %s", serverId, e.getMessage());
            throw new RemoteInvokeException(serverId, msg);
        }
    }

    private void handlerException(String serverId, Channel channel, Exception e){
        log.error("To the server {}, occur exception {}", serverId, e.getMessage());
        if (!channel.isActive() && !channel.isOpen() && !channel.isWritable()) {
            channel.close();
            NettyClientChannelManager.removeChannel(serverId);
            log.error("The Server is unavailable, shutdown channel and the cached channel is deleted.");
        }
    }
}

基于 Grpc 实现

Grpc 提供类型三种客户端,分别是 newBlockingStub、newStub、newFutureStub

newBlockingStub: 阻塞式客户端调用

newStub:这是个纯异步调用客户端

newFutureStub:这种客户端也是异步的,之所以放在最后将是因为它具有同步客户端的属性,在实际使用中,既可以当做异步客户端使用也可以当做一个同步的客户端使用

我们这里使用的是 newStub,代码如下,完整代码请看源码

@Slf4j
public class GrpcRemotingInvoker implements RemotingInvoker {

    private final RpcClient rpcClient;
    private final RequestInterceptor requestInterceptor;
    private final RequestIdGenerator requestIdGenerator;

    public GrpcRemotingInvoker(RpcClient rpcClient, RequestInterceptor requestInterceptor, RequestIdGenerator requestIdGenerator) {
        this.rpcClient = rpcClient;
        this.requestInterceptor = requestInterceptor;
        this.requestIdGenerator = requestIdGenerator;
    }

    //~

    @Override
    public void invokeAsync(Message message, ServerInformation serverInformation, CallCallback callback) throws RpcException {
        String serverId = serverInformation.getServerId();
        ManagedChannel channel = GrpcClientChannelManager.establishChannel((GrpcClient) rpcClient, serverInformation);
        MessageServiceGrpc.MessageServiceStub messageServiceStub = MessageServiceGrpc.newStub(channel);
        final String random = requestIdGenerator.generate();
        MessageRequestBody requestBody = new MessageRequestBody().setServerId(serverId).setMessage(message).setRequestId(random);
        MessageRequest messageRequest = MessageRequest.newBuilder().setBody(JSON.toJSON(requestBody)).build();
        requestInterceptor.intercept(requestBody);

        try {
            messageServiceStub.messageProcessing(messageRequest, new StreamObserver<MessageResponse>() {
                @Override
                public void onNext(MessageResponse response) {
                    MessageResponseBody responseBody = JSON.parse(response.getBody(), MessageResponseBody.class);
                    callback.onCompleted(responseBody);
                }

                @Override
                public void onError(Throwable throwable) {
                    log.error(throwable.getMessage(), throwable);
                    throw new RemoteInvokeException(serverId, throwable.getMessage());
                }

                @Override
                public void onCompleted() {

                }
            });
        } catch (StatusRuntimeException e) {
            Status.Code code = e.getStatus().getCode();
            handlerException(serverId, channel, code);
            throw new RemoteInvokeException(serverId, e.getMessage());
        } catch (Exception e) {
            channel.shutdown();
            String msg = String.format("To the Server: %s, exception when sending a message, cause by: %s", serverId, e.getMessage());
            throw new RemoteInvokeException(serverId, msg);
        }
    }

    private void handlerException(String serverId, ManagedChannel channel, Status.Code code){
        log.error("To the Server: {}, exception when sending a message, Status Code: {}", serverId, code);
        if (Status.Code.UNAVAILABLE == code) {
            channel.shutdown();
            GrpcClientChannelManager.removeChannel(serverId);
            log.error("The Server is unavailable, shutdown channel and the cached channel is deleted.");
        }
    }
}

注意

  1. GrpcRemotingInvoker 中的异步调用 messageProcessing 方法中的 onError 回调触发情况如下:
  • 当客户端在发送请求时遇到网络错误或其他不可恢复的错误时,onError 方法会被调用。

  • 当服务器在处理请求时遇到错误或异常时,onError 方法会被调用。

  • 当服务器返回带有非零状态码的错误响应时,onError 方法会被调用。

  • 如果在 onNext 方法中抛出异常,该异常会被捕获并传递给 onError 方法。这意味着在异步请求期间,如果在 onNext 方法中发生异常,它将导致 onError 方法被调用,而不是再次调用 onNext 方法