当前位置: 首页 > 知识库问答 >
问题:

异步grpc存根中的取消订阅观察者-Java/静态编程语言

芮学
2023-03-14

我有一个异步存根,其中添加了一个观察者:

            val obs =  object: StreamObserver<Hallo> {

                override fun onNext(value: Hallo) {

                    streamSuccess(value)
                }

                override fun onError(t: Throwable?) {

                    nonSuccess(t?.message ?: "Unknow error")
                }

                override fun onCompleted() {

                    Log.d("Info", "completed")
                    completed()
                }
            }

我希望a能够从异步存根中删除这个观察者,这样我就可以取消客户端的流。

正如github上所说:https://github.com/grpc/grpc-java/issues/3095

我尝试保留观察者的一个局部变量,以便客户稍后可以执行以下操作:

observer?.onError(Status.CANCELLED.cause)

那不起作用。

我还尝试从抽象类创建自己的类:ClientCallStreamObserver

class CancellableStreamObserver<TResponse>(val next:(value:TResponse)->Unit, val onError:(t:Throwable)-> Unit, val onCompleted:(()->Unit), val onCanceledHandler: (()->Unit)? = null) : ClientCallStreamObserver<TResponse>() {
        override fun isReady(): Boolean {
            return  true
        }

        override fun setOnReadyHandler(onReadyHandler: Runnable?) {
            //TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
        }

        override fun disableAutoInboundFlowControl() {
            //TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
        }

        override fun cancel(message: String?, cause: Throwable?) {

            //TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
        }

        override fun request(count: Int) {
            //TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
        }

        override fun setMessageCompression(enable: Boolean) {
            //TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
        }

        override fun onNext(value: TResponse) {
            next(value)
        }

        override fun onError(t: Throwable) {
            if (t is StatusException) {
                if (t.status.code == Status.Code.CANCELLED) {
                    onCanceledHandler?.let {
                        it()
                    }
                }
            }
            if (t is StatusRuntimeException) {
                if (t.status.code == Status.Code.CANCELLED) {
                    onCanceledHandler?.let {
                        it()
                    }
                }
            }
            this.onError(t)
        }

        override fun onCompleted() {
            onCompleted()
        }
    }

所以稍后我可以打电话:

        observer?.cancel("Cancelled for the user",Status.CANCELLED.cause)

那也没用。

我知道它不起作用,因为如果用户再次添加新的观察者,我会得到重复的响应,就好像旧的观察者还活着一样。

我知道我可以用频道关闭频道。shutdownNow()。但我觉得这太咄咄逼人了。

谢谢

共有1个答案

羊舌源
2023-03-14

参考https://github.com/grpc/grpc-java/issues/3095:

对于async,可以使用ClientCallStreamObserver。取消(),方法是将返回的StreamObserver强制转换为ClientCallStreamObserver,或实现传入的StreamObserver实现ClientResponseObserver。

(着重部分由作者标明)

grpc java将实现适当的方法,而不是您的实例。所以模式是:

stub.foo(req, object: ClientResponseObserver<Hallo> {
    override fun beforeStart(respObs: ClientCallStreamObserver<Hallo>) {
        // save respObs for later
    }
    override fun onNext(value: Hallo) {
        streamSuccess(value)
    }
    override fun onError(t: Throwable?) {
        nonSuccess(t?.message ?: "Unknow error")
    }
    override fun onCompleted() {
        Log.d("Info", "completed")
        completed()
    }
});

// -or- (for streaming calls only)

val obs = ...;
val respObs = stub.foo(obs) as (ClientCallStreamObserver<Hallo>);
respObs.onNext(req);
// save respObs for later

请注意,在这两种情况下,respObs是相同的。使用ClientResponseObserver主要是为了在响应观察者中进行流式处理并希望取消,以避免线程竞争。

 类似资料:
  • 本文向大家介绍system.reactive 订阅/取消订阅可观察对象(IDisposable),包括了system.reactive 订阅/取消订阅可观察对象(IDisposable)的使用技巧和注意事项,需要的朋友参考一下 示例 订阅返回IDisposable: 当您准备取消订阅时,只需处置订阅即可:            

  • 在ngOnDestory中,我取消了两个订阅,但仍然得到前面的错误。 现在我几乎可以肯定问题出在这行:即使我在注销之前取消了proposalSubscription和chatSubscription的订阅,但仍然会出现错误。有没有解决这个问题的方法?而且,我对RXJ和操作符没有太多的经验。有没有操作符可以用来避免这种嵌套订阅? 提前道谢。

  • 我正在尝试破译以下函数: null

  • 我有这个问题,我一直在寻找,但找不到解决方案(或者也许我不能根据其他答案做出解决方案)。 我的问题是,我需要找到一种方法来等待可观察的(有自己的订户)并等待另一个可观察的(有自己的订户)完成。 场景是这样的: 奥布1- 奥布斯2 - 我主要担心的是我需要两个订阅者。在我看来,obs1 和 obs2 并行运行,但需要检查 obs1 是否以新的会话令牌完成。也许这不是RxJava的主要目的。 Obs1

  • 我最近才知道,我们必须在Angular破坏组件之前取消订阅,否则会造成内存泄漏。 我还知道,我们可以获得对订阅的引用,通过对该订阅调用unsubscribe方法,我们可以进行订阅。例如: 下面的方法会起作用吗?在HTTP调用的情况下,退订是最好的方法吗?

  • 我通读了RxJS文档,并希望确保我理解了< code > subscriber . unsubscribe()和< code > subscriber . complete()之间的区别。 假设我有一个有两个订阅者的可观察对象,订阅者1和订阅者2。如果订阅者1对其订阅调用取消订阅,它将不再接收来自可观察对象的通知,但订阅者2将继续接收它们。 <代码>的文档。complete(): 观察者回调,用于