当前位置: 首页 > 知识库问答 >
问题:

为什么是水槽。许多()。多播()。onBackpressureBuffer()在其中一个订阅者取消订阅后完成,以及如何避免订阅

朱浩大
2023-03-14

我在使用接收器时遇到了一种我不理解的行为。许多的

fun main() {

    val sink : Sinks.Many<String>  = Sinks.many().multicast().onBackpressureBuffer()
    val flux = sink.asFlux().log()

    val d = flux.subscribe {
        println("--> $it")
    }

    sink.emitNext("1", Sinks.EmitFailureHandler.FAIL_FAST)

    val d2 = flux.subscribe {
        println("--2> $it")
    }

    sink.emitNext("2", Sinks.EmitFailureHandler.FAIL_FAST)
}

此代码显示第一个订户获得值1和2,第二个订户获得值2。到现在为止,一直都还不错:

11:49:06.936 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:49:06.938 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(1)
--> 1
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:49:06.943 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(2)
--> 2
11:49:06.943 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(2)
--2> 2

现在,假设第一个用户在第一次发射后处理(取消)其订阅,我希望第一个用户得到1,第二个用户得到2:


    val sink : Sinks.Many<String>  = Sinks.many().multicast().onBackpressureBuffer()
    val flux = sink.asFlux().log()

    val d = flux.subscribe {
        println("--> $it")
    }

    sink.emitNext("1", Sinks.EmitFailureHandler.FAIL_FAST)

    d.dispose()

    val d2 = flux.subscribe {
        println("--2> $it")
    }

    sink.emitNext("2", Sinks.EmitFailureHandler.FAIL_FAST)

}
11:51:48.684 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:51:48.685 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(1)
--> 1
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - cancel()
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:51:48.690 [main] INFO reactor.Flux.EmitterProcessor.1 - onComplete()

然而,当第二个订户尝试订阅时,流量被视为已完成。为什么会这样?我需要水槽。许多人可以随时订阅或取消订阅,无需取消。


共有1个答案

段干恺
2023-03-14

我刚刚提到了同样的问题。

它是由autoCancel默认为true引起的。不幸的是,onBackpressureBuffer javadoc没有提到它。

这种行为是从EmitterProcessor.create继承的,在那里它被记录下来。

要将autoCancel标志设置为false,必须使用备用onBackpressureBuffer

java prettyprint-override">Many<String> sink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
 类似资料:
  • 我通读了RxJS文档,并希望确保我理解了< code > subscriber . unsubscribe()和< code > subscriber . complete()之间的区别。 假设我有一个有两个订阅者的可观察对象,订阅者1和订阅者2。如果订阅者1对其订阅调用取消订阅,它将不再接收来自可观察对象的通知,但订阅者2将继续接收它们。 <代码>的文档。complete(): 观察者回调,用于

  • 我使用SockJS和StompJS,当我在浏览器中打开我的应用程序时,有时它会在连接到websocket之前尝试订阅一些主题。我希望主题订阅等待应用程序连接到websocket。 这就是我实现此代码的原因,我将其称为: 因此,我只在连接状态为时才订阅该主题,并且只有在客户端首次成功连接时才会调用该主题。 我想稍后从主题中取消订阅,所以我需要内部订阅返回的对象,我还需要内部订阅的消息。 我所实现的很

  • 我有一个生产者,它从Rest API下载页面中的数据,以及几个处理页面的消费者(例如,将它们加载到数据库中)。 我希望生产者和消费者并行工作,这意味着生产者不应该等到一个页面被消费后再下载下一个页面。每个使用者都需要按顺序处理页面。 当下载所有页面时,主线程应该等待所有消费者完成他们的工作(因为消费可能比生产需要更长的时间)。 我目前的做法如下: 我已经创建了一个下载页面的可观察对象,它在附加消费

  • 本文向大家介绍如何在C#中订阅事件,我们可以在C#中为一个事件拥有多个订阅者吗?,包括了如何在C#中订阅事件,我们可以在C#中为一个事件拥有多个订阅者吗?的使用技巧和注意事项,需要的朋友参考一下 事件使类或对象在发生感兴趣的事件时通知其他类或对象。 引发事件的类称为发布者,而处理事件的类称为订阅者。 在事件中 一个事件可以有多个订阅者。订阅者可以处理来自多个发布者的多个事件。 没有订阅者的事件永远

  • 1.使用者角度 为企业、组织或个人提供一种信息传播方式,用对口的内容达成企业、组织与成员之间的沟通和知识传播。体现在为用户提供内容服务,传达各类资讯,用户订阅后,可在轻推客户端定期接收到内容资讯的推送 2.开发者角度 主要通过会话的形式为用户提供服务,用户在协同界面点击订阅号图标后,可直接进入与订阅号的聊天界面,开发难度低,支持在后台定制菜单,通过菜单引导用户到不同的去处。通知消息会被折叠在订阅消

  • 问题内容: 我正在使用RxJava计算Android中某些传感器数据的标准化自动相关性。奇怪的是,我的代码引发了一个异常(“ java.lang.IllegalStateException:只允许一个订阅者!”),我不确定该怎么做:我知道GroupedObservables订阅多个订阅者时可能会抛出此异常,但是我不认为我在任何地方都在使用这种东西。 在下面,您找到(最有可能)触发异常的方法: 这是