当前位置: 首页 > 知识库问答 >
问题:

grpc java:正确处理服务流调用的客户端重试

吴唯
2023-03-14

我正在尝试在grpc上使用服务流和客户端上的异步存根建立一个简单的发布/订阅模式。在实现了部分流式消息返回到客户端之后,我想处理连接中断的场景。现在,我正在实现服务关闭时的部分,例如,客户端应该从连接丢失中“恢复”。

我在google/github/so上阅读并搜索了关于重试机制的内容,最后为流式传输消息的服务中的方法设置了重试策略。据我所知,当服务返回重试策略中定义的一些retryableStatusCodes时,重试机制应该可以工作。在客户端上引入重试策略后,我想对其进行测试,下面两个场景的结果让我对重试感到困惑。

第一种情况:

  • 调用连接过程(有意在n秒后,没有消息流回到客户端)

第二种情况:

  • 调用连接过程(第一条消息到达约n秒后,消息在客户端的onNext处理程序中处理)

总的来说,让我困惑的是,为什么这两种场景之间的行为有差异?为什么在第一个场景中检测到服务器返回不可用并尝试重试,但在第二个场景中,即使状态相同,重试也不起作用?

以下是客户端上的连接调用、服务上的连接方法以及客户端上重试策略的设置的代码

client:

messageStub.withWaitForReady().connect(messagesRequest, new StreamObserver<>() {
    @Override
    public void onNext(MessageResponse messageResponse) {
        //process new message
        MessageDto message = new MessageDto();
        message.setBody(messageResponse.getBody());
        message.setTitle(messageResponse.getTitle());

        messageService.broadcastMessage(message);
    }

    @Override
    public void onError(Throwable throwable) {
        //service went down
        LOGGER.error(throwable.getStackTrace());
    }

    @Override
    public void onCompleted() {
        //This method should be called when user logs out of the application
        LOGGER.info(String.format("Message streaming terminated for user %d", userId));
    }
});
service:

@Override
public void connect(MessageRequest request, StreamObserver<MessageResponse> responseObserver) {
    Long userId = request.getUserId();

    ServerCallStreamObserver<MessageResponse > serverCallStreamObserver =
        (ServerCallStreamObserver<MessageResponse >) responseObserver;
    serverCallStreamObserver.setOnCancelHandler(getOnCancelHandler(userId));
    registerClient(userId, serverCallStreamObserver);
    //responseObserver.onCompleted() is left out so connection is not terminated
}


@EventListener
public void listenForMessages(MessageEvent messageEvent) {
    //omitted code (just some data retrieving - populate conn and message vars)....

    MessageResponse.Builder builder = MessageResponse.newBuilder();
    StreamObserver<MessageResponse> observer = conn.getResponseObserver();
    builder.setType(message.getType());
    builder.setTitle(message.getTitle());
    builder.setBody(message.getBody());

    observer.onNext(builder.build())
}

retryPolicy:

{
  "methodConfig" : [
    {
      "name": [
        {
          "service": "ch.example.proto.MessageService",
          "method": "connect"
        }
      ],
      "retryPolicy": {
        "maxAttempts": 10,
        "initialBackoff": "5s",
        "maxBackoff": "30s",
        "backoffMultiplier": 2,
        "retryableStatusCodes": ["UNAVAILABLE"]
      }
    }
  ]
}

共有1个答案

宋英杰
2023-03-14

问题是,接收消息会提交RPC。这将在gRFC A6客户端重试中讨论。它提到了响应头,当服务器响应第一条消息时,会隐式发送这些头。

本质上,一旦gRPC将数据传递回客户端,就无法自动重试。如果gRPC重试,它应该如何将新流与它已经响应的流相结合?是否应跳过第一个响应?但如果现在的反应不同呢?元数据(通过响应头传递)的问题更为严重,因为这些元数据无法再次提供给客户端。

gRPC能够将客户端的请求重放到多个后端,但一旦它开始接收来自后端的响应,它将“固定”到该后端并且无法更改其决定。

您需要应用程序级重试才能重新建立流。当客户端重新建立流时,它可能需要修改请求以通知服务器客户端已经收到了哪些消息。

 类似资料:
  • 问题 当你在 CoffeeScript 上创建了一个函数,并希望将它用在有网页浏览器的客户端和有 Node.js 的服务端时。 解决方案 以下列方法输出函数: # simpleMath.coffee # these methods are private add = (a, b) -> a + b subtract = (a, b) -> a - b square = (x)

  • 客户端模块是一个较复杂的模块,这里包含了集群管理、路由、地址管理器、连接管理器、负载均衡器,还与代理、注册中心等模块交互。

  • 我有这个代码: 我一直在犯这样的错误: java:不兼容的类型:com。应用句子分类请求。无法将生成器转换为com。应用句子分类请求 我已经使用Maven插件生成了gRPC Java文件。在看了多个例子后,我不确定我的问题是什么。

  • 我尝试使用Java中的Sockets连接到多个客户端。一切似乎都正常,但问题是,服务器只监听第一个客户端。如果有多个客户端,服务器可以向它们发送所有消息,但他可以只监听来自第一个客户端的消息。我尝试了所有这些(我从昨天开始就遇到了这个问题)。所以我很确定,错误一定在“ClientListener”类中。 说明:有一个客户端列表(用于与字符串通信的连接)。在GUI中有一个列表,我可以在其中选择要与哪

  • 我的程序基本上是: > 客户端向服务器发送字符串, 基于这个字符串,服务器正在创建一个ArrayList, ArrayList 被发送回客户端。 这里失败的是: 客户端发送字符串后,服务器会收到它,并且不执行任何其他操作。在这段时间里,客户端继续工作并得到一个空指针。 客户端: } 服务器端:

  • 我想在一些计算机之间建立点对点连接,这样用户就可以在没有外部服务器的情况下聊天和交换文件。我最初的想法如下: 我在服务器上制作了一个中央服务器插座,所有应用程序都可以连接到该插座。此ServerSocket跟踪已连接的套接字(客户端),并将新连接的客户端的IP和端口提供给所有其他客户端。每个客户端都会创建一个新的ServerSocket,所有客户端都可以连接到它。 换句话说:每个客户端都有一个Se