当前位置: 首页 > 文档资料 > gRPC 学习笔记 >

状态 - 异常处理的流程分析

优质
小牛编辑
129浏览
2023-12-01

前言

在 gRPC 的新版本(1.0.0-pre2)中,为了方便传递 debug 信息,在 StatusException 和 StatusRuntimeException 中增加了名为 Trailer 的 Metadata。

注: 在此之前,Status(和Status映射的StatusException)只有两个字段可以传递信息:1. status code 2. status decription 如果需要传递其他信息则无计可施。在引入 Metadata 之后,终于可以通过使用 Metadata (映射到HTTP2 header) 来传递更多信息。

至此,gRPC 中处理异常的设计已经基本成型,其核心设计就是通过 Status 和 Metadata 来传递异常信息,服务器端抛出异常到客户端的过程,实际是对 异常 和 Status/Metadata 的传递和转换的过程:

  1. 服务器端抛出异常
  2. 异常转为 Status/Metadata
  3. Status/Metadata 传递到客户端
  4. 客户端将 Status/Metadata 转回异常

而 Status/Metadata 在服务器端和客户端的传输中,是通过 HTTP header 来实现的。

整个异常处理的流程概述如图:

异常处理的流程分析 - 图1

最终实现的目标:当服务器端抛出异常时,客户端就会抛出对应的异常。

下面我们对整个流程做详细的代码分析。

Metadata 的准备工作

异常中引入 Metadata

在 StatusRuntimeException 和 StatusException,增加了一个新的构造函数,可以传入 Metadata 并保存起来,同时提供一个getter 方法用来获取保存的 Metadata :

  1. private final Metadata trailers;
  2. public StatusRuntimeException(Status status, @Nullable Metadata trailers) {
  3. super(Status.formatThrowableMessage(status), status.getCause());
  4. this.status = status;
  5. this.trailers = trailers;
  6. }
  7. public final Metadata getTrailers() {
  8. return trailers;
  9. }

Status 中引入 Metadata

看 class Status 的相关代码实现,这里主要是做好在 Status 到 异常 的相互转换过程中处理 Metadata 的准备:

  1. 定义了两个 Metadata.Key,分别用于 status 的状态码和消息:

    1. /**
    2. * 用于绑定状态码到 trailing metadata 的key.
    3. */
    4. public static final Metadata.Key<Status> CODE_KEY
    5. = Metadata.Key.of("grpc-status", new StatusCodeMarshaller());
    6. /**
    7. * 绑定状态消息到 trailing metadata 的key.
    8. */
    9. @Internal
    10. public static final Metadata.Key<String> MESSAGE_KEY
    11. = Metadata.Key.of("grpc-message", STATUS_MESSAGE_MARSHALLER);
  2. 从异常(包括cause 链)的中提取出 error trailers

    1. public static Metadata trailersFromThrowable(Throwable t) {
    2. Throwable cause = checkNotNull(t, "t");
    3. while (cause != null) {
    4. if (cause instanceof StatusException) {
    5. return ((StatusException) cause).getTrailers();
    6. } else if (cause instanceof StatusRuntimeException) {
    7. return ((StatusRuntimeException) cause).getTrailers();
    8. }
    9. cause = cause.getCause();
    10. }
    11. return null;
    12. }
  3. 重载两个方法,在从 Status 到 异常 的转变过程中容许加入 Metadata 信息

    1. public StatusRuntimeException asRuntimeException(Metadata trailers) {
    2. return new StatusRuntimeException(this, trailers);
    3. }
    4. public StatusException asException(Metadata trailers) {
    5. return new StatusException(this, trailers);
    6. }

服务器端流程

以onError()方式发送异常

当服务器端需要抛出异常,尤其是 StatusRuntimeException 和 StatusException 时,正确的姿势是通过 StreamObserver 对象的 onError() 方法发送异常信息到客户端:

  1. public void sayHello(SayHelloRequest request, StreamObserver<SayHelloResponse> responseObserver) {
  2. responseObserver.onError(new StatusRuntimeException(Status.ALREADY_EXISTS));
  3. }

onError()方法在 ServerCalls.ServerCallStreamObserverImpl 中的实现:

  1. public void onError(Throwable t) {
  2. // 从 Throwable 中获取 Metadata
  3. Metadata metadata = Status.trailersFromThrowable(t);
  4. if (metadata == null) {
  5. // 如果没有找到,即异常中没有设置 Metadata,则只能 new 一个空的 Metadata
  6. metadata = new Metadata();
  7. }
  8. // 将 异常 转为 Status 对象,然后加上 Metadata ,一起发送给客户端
  9. call.close(Status.fromThrowable(t), metadata);
  10. }

call 的 close() 方法在 ServerCallImpl 中的实现:

  1. public void close(Status status, Metadata trailers) {
  2. checkState(!closeCalled, "call already closed");
  3. closeCalled = true;
  4. // 发送给 stream
  5. stream.close(status, trailers);
  6. }

stream 的 close() 方法在 AbstractServerStream 中的实现:

  1. public final void close(Status status, Metadata trailers) {
  2. Preconditions.checkNotNull(status, "status");
  3. Preconditions.checkNotNull(trailers, "trailers");
  4. if (!outboundClosed) {
  5. outboundClosed = true;
  6. endOfMessages();
  7. //将 Status 添加到 Metadata
  8. addStatusToTrailers(trailers, status);
  9. //将 Metadata 发送出去,注意此时 Status 是没有发送的,只有 Metadata
  10. abstractServerStreamSink().writeTrailers(trailers, headersSent);
  11. }
  12. }
  13. private void addStatusToTrailers(Metadata trailers, Status status) {
  14. // 安全起见,删除 Metadata 中可能存在的 CODE_KEY 和 MESSAGE_KEY
  15. trailers.removeAll(Status.CODE_KEY);
  16. trailers.removeAll(Status.MESSAGE_KEY);
  17. // 将 status 放置到 Metadata
  18. trailers.put(Status.CODE_KEY, status);
  19. if (status.getDescription() != null) {
  20. // 将 description 放置到 Metadata
  21. trailers.put(Status.MESSAGE_KEY, status.getDescription());
  22. }
  23. }

writeTrailers() 方法在 NettyServerStream 中的实现:

  1. public void writeTrailers(Metadata trailers, boolean headersSent) {
  2. Http2Headers http2Trailers = Utils.convertTrailers(trailers, headersSent);
  3. writeQueue.enqueue(
  4. new SendResponseHeadersCommand(transportState(), http2Trailers, true), true);
  5. }

总结:服务器端抛出异常之后,异常信息被转化为 Status 和 Metedata,然后 Status 也最终被转为 Metedata,最后以 HTTP Header 的方式发送给客户端。

直接抛出异常

如果 gRPC 服务器端不是以标准的 onError() 方法发送异常,而是以直接抛出异常的方式结束处理流程,则此时的处理方式有所不同: Metadata 信息会被忽略,而不是传递给客户端!

注:谨记标准的符合 gRPC 要求的方式是通过 onError() 方法,而不是直接抛异常。

当服务器端抛出异常时, 处理这个异常的代码在 ServerImpl.JumpToApplicationThreadServerStreamListener.halfClosed() 中:

  1. private static class JumpToApplicationThreadServerStreamListener implements ServerStreamListener {
  2. ......
  3. public void halfClosed() {
  4. callExecutor.execute(new ContextRunnable(context) {
  5. @Override
  6. public void runInContext() {
  7. try {
  8. getListener().halfClosed(); //服务器端抛出的异常会在这里跑出来
  9. } catch (RuntimeException e) {
  10. // 这里没有从异常中获取 Metadata, 而是 new 了一个新的空的
  11. internalClose(Status.fromThrowable(e), new Metadata());
  12. throw e;
  13. } catch (Throwable t) {
  14. // 同上
  15. internalClose(Status.fromThrowable(t), new Metadata());
  16. throw new RuntimeException(t);
  17. }
  18. }
  19. });
  20. }
  21. }

可以看到此时之前异常带的 metadata 信息是会被一个空的对象替代。

注: 关于这个事情,我开始认为是一个实现bug,因此提交了一个 Pull Request 给 gRPC,但是后来 gRPC 的开发人员解释说这个是故意设计的,为的就是要强制服务器端使用 onError() 方法而不是直接抛出异常。详细情况请见 metadata is lost when server sends StatusRuntimeException

解决方案

虽然 gRPC 官方推荐用 onError() 处理异常,但是实际上在实践时需要每个业务方法都要来一个大的 try catch 。这使得代码冗余而烦琐。

解决的方式,是自己写一个 ServerInterceptor, 实现一个 io.grpc.ServerCall.Listener 来统一处理

  1. class ExceptionInterceptor implements ServerInterceptor {
  2. public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
  3. ServerCall<ReqT, RespT> call, Metadata headers,
  4. ServerCallHandler<ReqT, RespT> next) {
  5. ServerCall.Listener<ReqT> reqTListener = next.startCall(call, headers);
  6. return new ExceptionListener(reqTListener, call);
  7. }
  8. }
  9. class ExceptionListener extends ServerCall.Listener {
  10. ......
  11. public void onHalfClose() {
  12. try {
  13. this.delegate.onHalfClose();
  14. } catch (Exception t) {
  15. // 统一处理异常
  16. ExtendedStatusRuntimeException exception = fromThrowable(t);
  17. // 调用 call.close() 发送 Status 和 metadata
  18. // 这个方式和 onError()本质是一样的
  19. call.close(exception.getStatus(), exception.getTrailers());
  20. }
  21. }
  22. }

客户端流程

接收应答读取Header

客户端在发送请求收到应答之后,在 DefaultHttp2FrameReader 中读取 frame,前面讲过异常会以HTTP header的方式发送过来,在客户端反应为客户端收到 Headers Frame:

  1. private void processPayloadState(ChannelHandlerContext ctx, ByteBuf in, Http2FrameListener listener){
  2. switch (frameType) {
  3. ......
  4. case HEADERS:
  5. readHeadersFrame(ctx, payload, listener);
  6. break;

读取出来的应答和 header 内容如下图:

异常处理的流程分析 - 图2

这里可以看到以下内容:

  1. HTTP 应答的状态码是200,表示成功,即使 gRPC 服务器端返回异常表示业务处理失败。因此,用 HTTP 状态码来评估服务器是否处理正常是没有意义的。
  2. HTTP 应答的 content-type 是 “application/grpc”
  3. grpc-status 和 grpc-message 两个 header 对应 Status 对象的 code 和 description
  4. 其他的 header 对应 Metadata 中的数据,比如上面的 extended-status 和 is-business-exception

转换为 status 和 trailer

之后在 Http2ClientStream 中将 header 转成 trailer (也就是Metadata):

  1. void transportHeadersReceived(Http2Headers headers, boolean endOfStream) {
  2. if (endOfStream) {
  3. transportTrailersReceived(Utils.convertTrailers(headers));
  4. }
  5. ......
  6. }

然后从 trailer 中获取数据转成 Status 对象:

  1. protected void transportTrailersReceived(Metadata trailers) {
  2. ......
  3. // 从 metadata 中获取 Status
  4. Status status = statusFromTrailers(trailers);
  5. // 清理不再需要的 header
  6. stripTransportDetails(trailers);
  7. // 继续处理
  8. inboundTrailersReceived(trailers, status);
  9. }
  10. private Status statusFromTrailers(Metadata trailers) {
  11. // 从 metadata 中的两个 key 获取 Status,注意 message 有可能为空
  12. Status status = trailers.get(Status.CODE_KEY);
  13. String message = trailers.get(Status.MESSAGE_KEY);
  14. if (message != null) {
  15. status = status.augmentDescription(message);
  16. }
  17. return status;
  18. }
  19. private static void stripTransportDetails(Metadata metadata) {
  20. // 去除传输细节
  21. // 实际就是删除 http 状态码的 header 和 Status 的两个属性的 header
  22. // 清理之后剩下的就是服务器端传过来的 medatata
  23. metadata.removeAll(HTTP2_STATUS);
  24. metadata.removeAll(Status.CODE_KEY);
  25. metadata.removeAll(Status.MESSAGE_KEY);
  26. }

此时 Status 和 Metadata 已经从 header 中还原出来,和服务器端发送的保持一致,后面就是传速和处理过程。

传递 Status 和 Metadata

Status 和 Metadata 被一路传递,在 DelayedStreamListener.closed()方法,调用一个任务,在 run()方法中调用 Listener 的 closed()方法:

  1. @Override
  2. public void closed(final Status status, final Metadata trailers) {
  3. delayOrExecute(new Runnable() {
  4. @Override
  5. public void run() {
  6. realListener.closed(status, trailers);
  7. }
  8. });
  9. }

ClientCallImpl.ClientStreamListenerImpl,在closed()方法中创建了一个 StreamClosed 的任务,扔给callExecutor:

  1. public void closed(Status status, Metadata trailers) {
  2. ......
  3. final Status savedStatus = status;
  4. final Metadata savedTrailers = trailers;
  5. class StreamClosed extends ContextRunnable {
  6. StreamClosed() {
  7. super(context);
  8. }
  9. @Override
  10. public final void runInContext() {
  11. if (closed) {
  12. // We intentionally don't keep the status or metadata from the server.
  13. return;
  14. }
  15. close(savedStatus, savedTrailers);
  16. }
  17. }
  18. callExecutor.execute(new StreamClosed());
  19. }

这个任务在执行时调用 ClientCallImpl 的 close() 方法,然后调用 observer.onClose() 方法:

  1. private void close(Status status, Metadata trailers) {
  2. closed = true;
  3. cancelListenersShouldBeRemoved = true;
  4. try {
  5. observer.onClose(status, trailers);
  6. } finally {
  7. removeContextListenerAndCancelDeadlineFuture();
  8. }
  9. }

将 Status 和 Metadata 转换为异常

最后进入 ClientCalls.UnaryStreamToFuture 的 onClose() 方法,这里做最终的 Status 和异常 的转换:

  1. private static class UnaryStreamToFuture<RespT> extends ClientCall.Listener<RespT> {
  2. ......
  3. public void onClose(Status status, Metadata trailers) {
  4. if (status.isOk()) {
  5. ......
  6. } else {
  7. // 当 Status 不是OK时,将 Status 和 Metadata 转为异常
  8. // 然后交给 responseFuture,后面客户端调用就会通过 responseFuture 得到这个异常
  9. responseFuture.setException(status.asRuntimeException(trailers));
  10. }
  11. }
  12. ......
  13. }

总结

整个异常处理的流程,总结起来就是两点:

  1. 异常的传输是通过 Status/Metadata 来实现
  2. Status/Metadata 的传输是通过 HTTP Header 来实现的

简化之后的流程图:

异常处理的流程分析 - 图3