当前位置: 首页 > 知识库问答 >
问题:

如何在Grpc中通过MethodDescriptor的InputStream调用服务器?

郑理
2023-03-14

我想通过客户端中MethodDescriptor的InputStream调用grpc服务器,但我没有成功。这是我的代码:

jdk 1.8 
grpc 1.33.1

grpc all dependency的版本为:

<dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-all</artifactId>
    <version>1.33.1</version>
    <scope>provided</scope>
 </dependency>

在服务器中,我的EchoServiceImpl扩展了EchoServiceGrpc。EchoServiceImplBase,重写echo rpc方法。我在构建服务器之前导出inputstream服务。

// 1. implements service
@Service
public class EchoServiceImpl extends EchoServiceGrpc.EchoServiceImplBase {

    @Override
    public void echo(EchoRequest request, StreamObserver<EchoResponse> responseObserver) {
        System.out.println("Received: " + request.getMessage());
               System.out.println("Received: " + request.getMessage());
        EchoResponse.Builder response = EchoResponse.newBuilder()
                .setMessage("ReceivedHELLO");
        responseObserver.onNext(response.build());
        responseObserver.onCompleted();
    }
}

// 2. export  inputstream  Service
  private void exportService(final Object bean) {
        BindableService bindableService = (BindableService) bean;
        ServerServiceDefinition serviceDefinition = bindableService.bindService();
        try {
            ServerServiceDefinition isDefinition = ServerInterceptors.useInputStreamMessages(serviceDefinition);//inputstream

            serviceDefinitions.add(serviceDefinition);
            serviceDefinitions.add(isDefinition);
        } catch (Exception e) {
            log.error("export service is fail", e);
        }
    }

// 3. start Grpc Server
private void startGrpcServer() {
        ServerBuilder<?> serverBuilder = grpcServerBuilder.buildServerBuilder();
        List<ServerServiceDefinition> serviceDefinitions = grpcClientBeanPostProcessor.getServiceDefinitions();
        for (ServerServiceDefinition serviceDefinition : serviceDefinitions) {
            serverBuilder.addService(serviceDefinition);        
        }

        try {
            Server server = serverBuilder.build().start();
            log.info("Grpc server started successfully");
        } catch (IOException e) {
            log.error("Grpc server failed to start", e);
        }
    }

在客户机中,我创建了InputStream MethodDescriptor,RequestMarshaller是inputstram,但我现在不是success util。谁能提供一些建议?谢谢~~~

public class ClientDemo {

    private static Map<String,Object> methodDescriptorCache = Maps.newHashMap();
    //  InputStream marshaller  
    final static MethodDescriptor.Marshaller<InputStream> marshaller = new MethodDescriptor.Marshaller<InputStream>() {
        @Override
        public InputStream stream(final InputStream value) {
            return value;
        }

        @Override
        public InputStream parse(final InputStream stream) {
            if (stream.markSupported()) {
                return stream;
            } else {
                return new BufferedInputStream(stream);
            }
        }
    };


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1.build channel
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080).usePlaintext().build();

        //2. create InputStream MethodDescriptor
        MethodDescriptor<InputStream, InputStream> echo = createInputStreamMethodDescriptor("echo.EchoService", "echo");

        //3. channel.newCall()
        ClientCall<InputStream, InputStream> call =
                channel.newCall(echo, CallOptions.DEFAULT);

        //4. input params
        String req = "{\"message\":\"input stream\"}";
        InputStream request = new ByteArrayInputStream(req.getBytes(StandardCharsets.UTF_8));
        System.out.println(request);
        
        //5. get result
        ListenableFuture<InputStream> res = ClientCalls.futureUnaryCall(call, request);
        System.out.println(res.get());
    }

    public static io.grpc.MethodDescriptor<InputStream, InputStream> createInputStreamMethodDescriptor(String clazzName, String methodName) {
        io.grpc.MethodDescriptor<InputStream, InputStream> methodDescriptor = (MethodDescriptor<InputStream, InputStream>) methodDescriptorCache
                .get(clazzName + methodName);
        if (methodDescriptor != null)
            return methodDescriptor;
        else {
            methodDescriptor = io.grpc.MethodDescriptor.<InputStream, InputStream> newBuilder()
                    .setType(MethodDescriptor.MethodType.UNARY)//
                    .setFullMethodName(io.grpc.MethodDescriptor.generateFullMethodName(clazzName, methodName))//
                    .setRequestMarshaller(marshaller)//
                    .setResponseMarshaller(marshaller)//
                    .setSafe(false)//
                    .setIdempotent(false)//
                    .build();
            methodDescriptorCache.put(clazzName + methodName, methodDescriptor);
            return methodDescriptor;
        }
    }
}
 [grpc-nio-worker-ELG-1-2] DEBUG io.grpc.netty.NettyClientHandler - [id: 0xa764352c, L:/127.0.0.1:55783 - R:localhost/127.0.0.1:8080] INBOUND HEADERS: streamId=3 headers=GrpcHttp2ResponseHeaders[:status: 200, content-type: application/grpc, grpc-status: 2] streamDependency=0 weight=16 exclusive=false padding=0 endStream=true
Exception in thread "main" java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: UNKNOWN
    at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:564)
    at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:545)
    at org.apache.shenyu.examples.grpc.test.ClientDemo2.main(ClientDemo2.java:56)
Caused by: io.grpc.StatusRuntimeException: UNKNOWN
    at io.grpc.Status.asRuntimeException(Status.java:533)
    at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:533)
    at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:464)
    at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:428)
    at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:461)
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:617)
    at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:803)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:782)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Process finished with exit code 1

共有1个答案

令狐献
2023-03-14

您可以在另一个问题中引用我的答案,即如何直接从protobuf创建grpc客户端,而无需将其编译为java;完整代码在HelloWorld/grpc java sample#reflection中

 类似资料:
  • 问题内容: 我目前正在使用HTTP方法来调用一些URL,这将导致JIRA问题。 现在,我想使用Apache Camel,该如何使用? 我需要通过骆驼调用以下链接: 由于我是Camel的新手,因此也请提出一些解决方案和示例。 谢谢 问题答案: 您可以轻松使用CXFRS组件;如果出于某种原因需要使用HTTP组件进行操作,则也可以轻松地使用它: 当然,在到达路由的这一部分之前,您将需要使用标头来丰富您的

  • 根据这个http://www.grpc.io/docs/tutorials/basic/python.html#creating-这里的服务器和示例https://github.com/grpc/grpc/tree/v1.0.0/examples/python/route_guide,当我生成我的pb2时。py文件中,应创建几个名为Stub和Servicer的类。但是,我生成了pb2。py文件不包

  • 我尝试了一个简单的java rmi程序,其中客户端对服务器进行远程调用,服务器返回字符串。它与通过LAN连接的两台计算机完美配合通过局域网,但服务器和客户端都连接到互联网?我尝试了以下简单的代码,将Client.java中的localhost更改为服务器的ip地址。没用。我打这么远的电话需要什么? //远程接口- //客户。JAVA //Server.java 我得到了以下例外:java。安全Ac

  • 我正在使用收集器跟踪java服务中的跨度,这一服务是http和grpc。收集器终结点是localhost:55680。此java服务跟踪成功。 现在,我想使用这个收集器基于gRPC跟踪我的go服务。 在我的go服务中,我复制以下文件:interceptor。去grpctrace。从repo opentelemetry转到contrib,这里https://github.com/open-telem

  • 下面的代码片段显示了如何在gRPC AspNet核心应用程序中启用gRPC web: 客户端应用程序的代码如下所示:

  • 我正在尝试让这个gRPC服务器示例与Google Asylo(https://github.com/google/asylo/tree/master/asylo/examples/grpc_server)一起工作。.要初始化服务器,我需要在此配置文件中指定server_address(https://github.com/google/asylo/blob/master/asylo/example