当前位置: 首页 > 知识库问答 >
问题:

RxJs 5 share()操作符是如何工作的?

皇甫波峻
2023-03-14

我不是100%清楚RxJs 5share()运算符是如何工作的,请参阅这里的最新文档。jsbin的问题在这里。

如果我创建一个由0到2组成的可观察序列,每个值间隔一秒钟:

var source = Rx.Observable.interval(1000)
.take(5)
.do(function (x) {
    console.log('some side effect');
});

如果我为这个可观察对象创建两个订户:

source.subscribe((n) => console.log("subscriptor 1 = " + n));
source.subscribe((n) => console.log("subscriptor 2 = " + n));

我在控制台中看到:

"some side effect ..."
"subscriptor 1 = 0"
"some side effect ..."
"subscriptor 2 = 0"
"some side effect ..."
"subscriptor 1 = 1"
"some side effect ..."
"subscriptor 2 = 1"
"some side effect ..."
"subscriptor 1 = 2"
"some side effect ..."
"subscriptor 2 = 2"

我原以为每个订阅都会订阅相同的可观察对象,但事实似乎并非如此!它就像订阅的行为创造了一个完全独立的可观察的!

但是如果share()运算符被添加到可观察的源:

var source = Rx.Observable.interval(1000)
.take(3)
.do(function (x) {
    console.log('some side effect ...');
})
.share();

然后我们得到:

"some side effect ..."
"subscriptor 1 = 0"
"subscriptor 2 = 0"
"some side effect ..."
"subscriptor 1 = 1"
"subscriptor 2 = 1"
"some side effect ..."
"subscriptor 1 = 2"
"subscriptor 2 = 2"

如果没有share(),这就是我所期望的。

这是怎么回事,share()操作符是如何工作的?每个订阅是否创建了一个新的可观察链?

共有2个答案

于正志
2023-03-14

如果满足以下两个条件,share会使可观察到的“热”:

  1. 订阅者人数

方案1:订户数量

var shared  = rx.Observable.interval(5000).take(2).share();
var startTime = Date.now();
var log = (x) => (value) => { 
    console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`);
};

var observer1 = shared.subscribe(log('observer1')),
    observer2;

setTimeout(()=>{
    observer2 = shared.subscribe(log('observer2'));
}, 3000);

// emission for both observer 1 and observer 2, with the samve value at startTime + 5 seconds
// another emission for both observers at: startTime + 10 seconds

场景2:新订阅之前,订阅服务器数为零。变“冷”

 var shared  = rx.Observable.interval(5000).take(2).share();
    var startTime = Date.now();
    var log = (x) => (value) => { 
    console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`);
};

var observer1 = shared.subscribe(log('observer1')),
    observer2;

setTimeout(()=>{
    observer1.unsubscribe(); 
}, 1000);

setTimeout(()=>{
    observer2 = shared.subscribe(log('observer2')); // number of subscribers is 0 at this time
}, 3000);
// observer2's onNext is called at startTime + 8 seconds
// observer2's onNext is called at startTime + 13 seconds

场景3:在新订阅之前完成了可观察到的。变得"冰冷"

 var shared  = rx.Observable.interval(5000).take(2).share();
    var startTime = Date.now();
    var log = (x) => (value) => { 
        console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`);
    };

var observer1 = shared.subscribe(log('observer1')),
    observer2;

setTimeout(()=>{
    observer2 = shared.subscribe(log('observer2'));
}, 12000);

// 2 emission for observable 1, at startTime + 5 secs, and at startTime + 10secs
// 2 emissions for observable 2,at startTime + 12 + 5 secs, and at startTime + 12 + 10secs
淳于新
2023-03-14

请注意,您使用的是RxJS v5,而文档链接似乎是RxJS v4。我不记得具体细节,但我认为share运营商经历了一些变化,特别是在完成和重新订阅时,但请不要相信我的话。

回到你的问题,正如你在研究中所显示的,你的期望与图书馆的设计不符。可观察对象懒洋洋地实例化他们的数据流,具体地在订户订阅时启动html" target="_blank">数据流。当第二个订户订阅同一个可观察对象时,另一个新的数据流将启动,就像它是第一个订户一样(是的,每个订阅都会创建一个新的可观察对象链,如您所说)。这是RxJS术语中创造的冷可观察的,这是RxJS可观察的默认行为。如果您想要一个observable,在数据到达时将其数据发送给其拥有的订阅者,这被称为热observable,获得热observable的一种方法是使用share操作符。

你可以在这里找到说明订阅和数据流:热观测和冷观测:有“热”和“冷”操作符吗?(这对RxJS v4有效,但大多数对v5有效)。

 类似资料:
  • 应用程序具有上下文路径-->/spring-form-simple-project 因此,为了访问,我使用: 这个控制器又返回student.jsp,当提交这个student.jsp时,它用-->@RequestMapping(value=“/AddStudent”,method=RequestMethod.post)调用controller 任何关于这通常如何工作的指示都将是有帮助的。 谢谢!

  • 问题内容: 可能是JavaScript中鲜为人知的部分,位于原型链旁边。 所以问题是:如何… …实际上是创建一个对象,并定义其原型链/构造函数/等? 最好是显示一个替代方案,以充分理解该关键字。 问题答案: 的操作者使用内部方法,它基本上具有下列功能: 初始化新的本机对象 设置此对象的内部,指向Function 属性。 如果函数的属性不是对象(则使用原始值,例如Number,String,Bool

  • 启动C 20时,原子的操作有等待操作和通知操作。但我不知道它们到底是怎么工作的。cppreference说: 执行原子等待操作。表现为它重复执行以下步骤: 比较此的值表示形式- 这些函数保证仅在值发生更改时返回,即使底层实现错误地解除了阻塞。 我不太明白这两个部分是如何相互关联的。这是否意味着如果值没有更改,那么即使我使用了notify\u one()/方法,函数也不会返回?这意味着该操作在某种程

  • 问题内容: 基本问题是:执行以下操作时会发生什么? 给定以下内容: 我明白那个: 与相同,直接分配给所指示的项目 与相同,即会进行加法 但是当我这样做时会发生什么 : 特别: 这和一样吗?(这不是就地操作) 如果是,在这种情况下是否有所不同: 一个指数,或 一个或 一个对象 背景 我开始研究此问题的原因是在使用重复索引时遇到了非直觉的行为: 问题答案: 您需要意识到的第一件事是,它并不完全映射到,

  • buffer buffer() 操作符的函数签名: buffer([breakObservable]) buffer 本身意味着我们在等待而不会发出任何值,直到 breakObservable 发生。示例如下: let breakWhen$ = Rx.Observable.timer(1000); let stream$ = Rx.Observable.interval(200) .buffer(

  • 这可不是一个简单的话题。其中涉及了应用程序中的诸多领域,你可能想要同步 API 的响应,或者你想要处理其它类型的流,比如 UI 中的点击事件或键盘事件。 有大量的操作符以它们各自的方式来处理时间,比如 delay、 debounce、 throttle、 interval, 等等。 interval 这个操作符用来创建一个 Observable,基本上它所做的就是按固定的时间间隔提供值,函数签名如