当前位置: 首页 > 知识库问答 >
问题:

grpc中的Java多线程

柴丰
2023-03-14

我有一个关于JAVA多线程的问题。

我有一个jetty webapp与grpc-流式传输-客户端。一切都很好,但我如何建立一个模型来获取流式传输数据?

webapp是用jsf构建的。因为我有一个控制器,它调用一个处理程序类来启动流:

    public void startStream(){
    if(streamHandler!=null & activatedStream == false){
        streamHandler.startStreamClient();          
        activatedStream = true;
    }else{
        FacesMessage message = new FacesMessage(FacesMessage.SEVERITY_ERROR, 
                "Could not initialize the StreamHandler and Client Class or a Stream still runs. Please check the logs.", "Stream is running: "+String.valueOf(activatedStream));
        FacesContext.getCurrentInstance().addMessage(null, message);
    }
}

方法简单地启动客户端和流。

    public void startStreamClient(){
    log.info("Calling startMethod of Handler............");
    CountDownLatch finishLatch;
    if(this.client.isChannelShutdown()& this.client!=null){

        this.client=new StreamClient(this.serverHost, this.serverPort);

        try{
            finishLatch = this.client.imageStream(this.startRequest);
        }catch(Exception e){
            log.warn("Error while starting the imageStream: "+e.getLocalizedMessage(), e);
        }
    }else{
        finishLatch = client.imageStream(this.startRequest);
    }
}

检查倒计时锁存器的实现仍然缺失。但在这种情况下,这并不重要。

响应如下:onNext()-方法提供流式数据:

public CountDownLatch imageStream(StreamRequest request){

    log.info("Calling imageStream-asnychStub...............");

    CountDownLatch finish = new CountDownLatch(1);

    /**
     *  The asyncStub is calling the rpc-Function with a new StreamObserver for the given Responses from the Server.
     */
    StreamObserver<StreamRequest> requestOberserver = asyncStub.streamImagaData(new StreamObserver<StreamResponse>() {

        /**
         * The onNext Method is getting the imageDate, if it is send
         */
        @Override
        public void onNext(StreamResponse response) {
            System.out.println("Data-Input: "+response.getImageData().length());
        }

        /**
         * The onError Method is getting an Exception Object if it is thrown
         */
        @Override
        public void onError(Throwable t) {
            log.warn("Bidirectional Stream with Server an Client: "+t.getLocalizedMessage(), t);
        }           

        /**
         * The onCompleted is for ending the Stream and reduces the CountDownLatch by one
         */
        @Override
        public void onCompleted() {
            log.info("Bidirectional Streaming has finished....");
            finish.countDown();
        }
    });

    /**
     * This Block is for sending a StreamRequest to the Server.
     */
    try{
        log.info("Sending a Streaming-Request to Server with State: "+request.getStreamState().name());
        requestOberserver.onNext(request);
    }catch (RuntimeException ex) {
        log.warn("Error sending requst to Server: "+ex.getLocalizedMessage(), ex);
        requestOberserver.onError(ex);
    }
    requestOberserver.onCompleted();

    return finish;
}

图像数据简单地打印在屏幕上。我试图建立一个消费者-生产者模型,但失败了,因为响应以StreamObserver的内部类型返回。

如何实时获取这些数据。我必须创建StreamObserver的正式实现吗?或者我必须在哪里放置额外的线程?线程是唯一的选择吗?我需要打电话吗?

提前感谢。

共有1个答案

鲜于璞瑜
2023-03-14

我通过实现观察者模式解决了这个问题。

ManagedBean成为StreamClient的观察者。

 类似资料:
  • 我找到了关于线程安全的代码,但它没有来自给出示例的人的任何解释。我想知道为什么如果我不在“count”之前设置“synchronized”变量,那么count值将是非原子的(总是=200是期望的结果)。谢谢

  • 我有一个异步gRPC客户端,用于多线程环境。当多个线程通过客户端同时连接到服务时,我看到以下错误流: 请求似乎正在成功完成,但是,这些消息充斥着我的日志,让我感到紧张! 在我的测试中,每个线程创建自己的通道并提交自己的异步请求。无论服务负载如何,都会发生错误。如果客户端在不同的进程中运行,则不会发生错误。 我的设置: Python版本: version: 任何洞察都将不胜感激!

  • 这是一个关于Java中多线程的初学者问题。 根据我的理解,当创建多个(用户)线程来运行程序或应用程序时,就没有父线程和子线程的概念。它们都是独立的用户线程。 因此,如果主线程完成执行,那么另一个线程(Thread2)仍将继续执行,因为在Thread2的执行线程完成之前,它不会被JVM杀死(https://docs.oracle.com/javase/6/docs/api/java/lang/Thr

  • 问题内容: 在多线程环境中使用Singleton类的首选方法是什么? 假设我有3个线程,并且所有这些线程都尝试同时访问单例类的方法- 如果不保持同步会怎样? 在内部使用 方法还是使用块是好的做法。 请告知是否还有其他出路。 问题答案: 从理论上讲,这项任务并不容易,因为您要使其真正成为线程安全的。 在此上找到了一篇非常不错的论文@ IBM 仅获取单例不需要任何同步,因为这只是读取。因此,只需同步S

  • 问题内容: 鉴于以下多态: 我们如何在没有昂贵的getInstance()方法同步和双重检查锁定争议的情况下使它保持线程安全和懒惰?这里提到了单例的有效方法,但似乎并没有扩展到多例。 问题答案: 使用Java 8,它甚至可以更简单:

  • 我正在使用来处理通过文件的大量记录。每一行都是一条记录,我将每一行传递给单独的线程进行处理,问题是我必须收集这些处理过的记录以及在处理记录时生成的更多数据,然后在最后的数据收集上应用一些业务逻辑。我将一个通用的传递给所有线程来填充已处理的数据,当我通过visualVM调试它时,我发现(屏幕截图如下)这些线程在等待中花费的时间比在运行中花费的时间多。我想这是因为一个线程在写入时获得了锁。 有没有一种