深入探索
这里你将了解到我们基于 Netty 和 Grpc 的实现的同步和异步通信方式,下面是具体的代码。
同步调用
同步调用的优点如下:
- 简单直观,同步调用是一种顺序执行的方式,代码编写和理解相对容易,
- 错误处理方便,在同步调用中,可以通过异常处理机制来捕获和处理错误。
同步调用的缺点如下:
- 会阻塞等待,在同步调用中,当一个调用被执行时,程序会一直等待结果返回,期间无法执行其他任务,可能导致性能下降。
同步调用适用场景:同步调用适用于非频繁、执行时间不长的调用场景。
基于 Netty 实现
@Slf4j
public class NettyRemotingInvoker implements RemotingInvoker {
private final RpcClient rpcClient;
private final RequestInterceptor requestInterceptor;
private final RequestIdGenerator requestIdGenerator;
public NettyRemotingInvoker(RpcClient rpcClient, RequestInterceptor requestInterceptor, RequestIdGenerator requestIdGenerator) {
this.rpcClient = rpcClient;
this.requestInterceptor = requestInterceptor;
this.requestIdGenerator = requestIdGenerator;
}
@Override
public MessageResponseBody invoke(Message message, ServerInformation serverInformation) throws RpcException {
Channel channel = NettyClientChannelManager.establishChannel((NettyClient) rpcClient, serverInformation);
String serverId = serverInformation.getServerId();
final String random = requestIdGenerator.generate();
MessageRequestBody requestBody = new MessageRequestBody().setServerId(serverId).setMessage(message).setRequestId(random);
String requestJsonBody = JSON.toJSON(requestBody);
MessageRequest messageRequest = MessageRequest.newBuilder().setBody(requestJsonBody).build();
requestInterceptor.intercept(requestBody);
CompletableFuture<MessageResponseBody> completableFuture = new CompletableFuture<>();
NettyUnprocessedRequests.put(random, completableFuture);
try {
channel.writeAndFlush(messageRequest).addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
completableFuture.completeExceptionally(future.cause());
}
});
return completableFuture.get();
} catch (Exception e) {
handlerException(serverId, channel, e);
String msg = String.format("To the Server: %s, exception when sending a message, cause by: %s", serverId, e.getMessage());
throw new RemoteInvokeException(serverId, msg);
}
}
// ~
private void handlerException(String serverId, Channel channel, Exception e){
log.error("To the server {}, occur exception {}", serverId, e.getMessage());
if (!channel.isActive() && !channel.isOpen() && !channel.isWritable()) {
channel.close();
NettyClientChannelManager.removeChannel(serverId);
log.error("The Server is unavailable, shutdown channel and the cached channel is deleted.");
}
}
}
基于 Grpc 实现
@Slf4j
public class GrpcRemotingInvoker implements RemotingInvoker {
private final RpcClient rpcClient;
private final RequestInterceptor requestInterceptor;
private final RequestIdGenerator requestIdGenerator;
public GrpcRemotingInvoker(RpcClient rpcClient, RequestInterceptor requestInterceptor, RequestIdGenerator requestIdGenerator) {
this.rpcClient = rpcClient;
this.requestInterceptor = requestInterceptor;
this.requestIdGenerator = requestIdGenerator;
}
@Override
public MessageResponseBody invoke(Message message, ServerInformation serverInformation) throws RpcException {
String serverId = serverInformation.getServerId();
ManagedChannel channel = GrpcClientChannelManager.establishChannel((GrpcClient) rpcClient, serverInformation);
MessageServiceGrpc.MessageServiceBlockingStub messageClientStub = MessageServiceGrpc.newBlockingStub(channel);
final String random = requestIdGenerator.generate();
MessageRequestBody requestBody = new MessageRequestBody().setServerId(serverId).setMessage(message).setRequestId(random);
MessageRequest messageRequest = MessageRequest.newBuilder().setBody(JSON.toJSON(requestBody)).build();
requestInterceptor.intercept(requestBody);
try {
MessageResponse response = messageClientStub.messageProcessing(messageRequest);
return JSON.parse(response.getBody(), MessageResponseBody.class);
} catch (StatusRuntimeException e) {
Status.Code code = e.getStatus().getCode();
handlerException(serverId, channel, code);
throw new RemoteInvokeException(serverId, e.getMessage());
} catch (Exception e) {
channel.shutdown();
String msg = String.format("To the Server: %s, exception when sending a message, cause by: %s", serverId, e.getMessage());
throw new RemoteInvokeException(serverId, msg);
}
}
// ~
private void handlerException(String serverId, ManagedChannel channel, Status.Code code){
log.error("To the Server: {}, exception when sending a message, Status Code: {}", serverId, code);
if (Status.Code.UNAVAILABLE == code) {
channel.shutdown();
GrpcClientChannelManager.removeChannel(serverId);
log.error("The Server is unavailable, shutdown channel and the cached channel is deleted.");
}
}
}
异步调用
异步调用的优点如下:
-
非阻塞并发:在异步调用中,调用者可以发起多个调用,而无需等待结果返回,从而提高并发性能。
-
响应时间快:异步调用可以在任务执行的过程中继续处理其他任务,不会阻塞主线程,因此响应时间较短。
异步调用的缺点如下:
- 对于需要返回调用结果处理处理相对复杂,可能需要改为通知
异步调用适用场景:需要同时处理多个任务,并发性能要求较高的调用场景,而且调用结果不需要立即使用,可以在后台进行处理。
基于 Netty 实现
使用 Netty 发送异步请求,其实关键就是在 writeAndFlush 之后当 channelRead0 收到服务端响应时如何拿到结果
这里有两种方式:
-
通过 channel.attr 实现,代码可看项目提交记录
-
通过 CompletableFuture 实现,代码如下,完整代码可看源码
@Slf4j
public class NettyRemotingInvoker implements RemotingInvoker {
private final RpcClient rpcClient;
private final RequestInterceptor requestInterceptor;
private final RequestIdGenerator requestIdGenerator;
public NettyRemotingInvoker(RpcClient rpcClient, RequestInterceptor requestInterceptor, RequestIdGenerator requestIdGenerator) {
this.rpcClient = rpcClient;
this.requestInterceptor = requestInterceptor;
this.requestIdGenerator = requestIdGenerator;
}
//~
@Override
public void invokeAsync(Message message, ServerInformation serverInformation, CallCallback callback) throws RpcException {
Channel channel = NettyClientChannelManager.establishChannel((NettyClient) rpcClient, serverInformation);
String serverId = serverInformation.getServerId();
final String random = requestIdGenerator.generate();
MessageRequestBody requestBody = new MessageRequestBody().setServerId(serverId).setMessage(message).setRequestId(random);
MessageRequest messageRequest = MessageRequest.newBuilder().setBody(JSON.toJSON(requestBody)).build();
requestInterceptor.intercept(requestBody);
CompletableFuture<MessageResponseBody> completableFuture = new CompletableFuture<>();
NettyUnprocessedRequests.put(random, completableFuture);
try {
channel.writeAndFlush(messageRequest).addListener((ChannelFutureListener) channelFuture -> {
if (channelFuture.isSuccess()) {
completableFuture.whenComplete((responseBody, throwable) -> {
if (throwable == null){
callback.onCompleted(responseBody);
}
}).exceptionally(e->{
log.error(e.getMessage(), e);
throw new RemoteInvokeException(serverId, e.getMessage());
});
} else {
completableFuture.completeExceptionally(channelFuture.cause());
}
});
} catch (Exception e) {
handlerException(serverId, channel, e);
String msg = String.format("To the Server: %s, exception when sending a message, cause by: %s", serverId, e.getMessage());
throw new RemoteInvokeException(serverId, msg);
}
}
private void handlerException(String serverId, Channel channel, Exception e){
log.error("To the server {}, occur exception {}", serverId, e.getMessage());
if (!channel.isActive() && !channel.isOpen() && !channel.isWritable()) {
channel.close();
NettyClientChannelManager.removeChannel(serverId);
log.error("The Server is unavailable, shutdown channel and the cached channel is deleted.");
}
}
}
基于 Grpc 实现
Grpc 提供类型三种客户端,分别是 newBlockingStub、newStub、newFutureStub
newBlockingStub: 阻塞式客户端调用
newStub:这是个纯异步调用客户端
newFutureStub:这种客户端也是异步的,之所以放在最后将是因为它具有同步客户端的属性,在实际使用中,既可以当做异步客户端使用也可以当做一个同步的客户端使用
我们这里使用的是 newStub,代码如下,完整代码请看源码
@Slf4j
public class GrpcRemotingInvoker implements RemotingInvoker {
private final RpcClient rpcClient;
private final RequestInterceptor requestInterceptor;
private final RequestIdGenerator requestIdGenerator;
public GrpcRemotingInvoker(RpcClient rpcClient, RequestInterceptor requestInterceptor, RequestIdGenerator requestIdGenerator) {
this.rpcClient = rpcClient;
this.requestInterceptor = requestInterceptor;
this.requestIdGenerator = requestIdGenerator;
}
//~
@Override
public void invokeAsync(Message message, ServerInformation serverInformation, CallCallback callback) throws RpcException {
String serverId = serverInformation.getServerId();
ManagedChannel channel = GrpcClientChannelManager.establishChannel((GrpcClient) rpcClient, serverInformation);
MessageServiceGrpc.MessageServiceStub messageServiceStub = MessageServiceGrpc.newStub(channel);
final String random = requestIdGenerator.generate();
MessageRequestBody requestBody = new MessageRequestBody().setServerId(serverId).setMessage(message).setRequestId(random);
MessageRequest messageRequest = MessageRequest.newBuilder().setBody(JSON.toJSON(requestBody)).build();
requestInterceptor.intercept(requestBody);
try {
messageServiceStub.messageProcessing(messageRequest, new StreamObserver<MessageResponse>() {
@Override
public void onNext(MessageResponse response) {
MessageResponseBody responseBody = JSON.parse(response.getBody(), MessageResponseBody.class);
callback.onCompleted(responseBody);
}
@Override
public void onError(Throwable throwable) {
log.error(throwable.getMessage(), throwable);
throw new RemoteInvokeException(serverId, throwable.getMessage());
}
@Override
public void onCompleted() {
}
});
} catch (StatusRuntimeException e) {
Status.Code code = e.getStatus().getCode();
handlerException(serverId, channel, code);
throw new RemoteInvokeException(serverId, e.getMessage());
} catch (Exception e) {
channel.shutdown();
String msg = String.format("To the Server: %s, exception when sending a message, cause by: %s", serverId, e.getMessage());
throw new RemoteInvokeException(serverId, msg);
}
}
private void handlerException(String serverId, ManagedChannel channel, Status.Code code){
log.error("To the Server: {}, exception when sending a message, Status Code: {}", serverId, code);
if (Status.Code.UNAVAILABLE == code) {
channel.shutdown();
GrpcClientChannelManager.removeChannel(serverId);
log.error("The Server is unavailable, shutdown channel and the cached channel is deleted.");
}
}
}
注意
- GrpcRemotingInvoker 中的异步调用 messageProcessing 方法中的 onError 回调触发情况如下:
-
当客户端在发送请求时遇到网络错误或其他不可恢复的错误时,onError 方法会被调用。
-
当服务器在处理请求时遇到错误或异常时,onError 方法会被调用。
-
当服务器返回带有非零状态码的错误响应时,onError 方法会被调用。
-
如果在 onNext 方法中抛出异常,该异常会被捕获并传递给 onError 方法。这意味着在异步请求期间,如果在 onNext 方法中发生异常,它将导致 onError 方法被调用,而不是再次调用 onNext 方法