让我们考虑下面的示例代码:
gude() {
const digestor$ = new Observable(subscriber => {
for (let i = 0; i < 4711; i++) {
setTimeout(() => {
const hash = createHashWithNLeadingZeroes(i);
subscriber.next(hash);
}, i);
}
});
const subscription = digestor$.subscribe(
_ => {
if (subscription) {
subscription.unsubscribe();
}
}
);
}
在函数gude()中,将创建一个新的observable,它将发出哈希值,该哈希值的前n个前导值设置为零。一个观察者订阅了那个可观察的,并立即取消订阅。让我们假设函数createHashWithNLeadingZeroes()需要相当长的时间来生成响应。
我想这里发生了以下事情:
(1) 创建了一个新的可观察对象,描述可观察对象行为的函数被内部存储在属性_subscribe中(https://github.com/ReactiveX/rxjs/blob/master/src/internal/Observable.ts: 37-41).
(2) 调用subscribe()时,首先将观察者包装在Subscriber对象中,然后将该Subscriber应用于保存观察者逻辑的_subscribe函数_subscribe()返回速度很快,因为只设置了4711个超时,并且返回了订阅对象(https://github.com/ReactiveX/rxjs/blob/master/src/internal/Observable.ts: 206-250).
订阅服务器基本上拦截对下一步()、错误()和完成()的调用,并且仅在内部未设置属性isStopted时转发给实际观察者(https://github.com/ReactiveX/rxjs/blob/master/src/internal/Subscriber.ts: 90-128)。
(3) 一旦设置了变量subscription,就会调用unsubscripte()。除其他外,这将导致将isStopped设置为true,以便订阅者不再将哈希转发给观察者(https://github.com/ReactiveX/rxjs/blob/master/src/internal/Subscriber.ts: 130-136).
然而,根据这一逻辑,可观察者仍将继续工作,直到所有4711个散列都被毫无目的地计算出来,因为观察者的方法变成了noop。最终,这种行为会影响应用程序的性能,这取决于订阅量和可观察对象的工作负载。我发现很难相信所描述的是正确的。我在这里遗漏了哪一部分?
我在这里遗漏了哪一部分?
我认为你缺少的部分是,尊重契约是被观察者的责任:如果它被要求停止发射,它应该停止发射。因此,您的可观察对象应执行以下操作:
const digestor$ = new Observable(subscriber => {
let keepGoing = true;
for (let i = 0; i < 4711 && keepGoing; i++) {
setTimeout(() => {
if (keepGoing) {
const hash = createHashWithNLeadingZeroes(i);
subscriber.next(hash);
}
}, i);
return () => keepGoing = false; // this function is called when the subscriber unsubscribes
}
});
依靠现有的工厂功能和操作员来实现期望的行为通常是一个更好的主意。例如,您可以通过使用range()
、timer()
和map()
来实现上述可观察性的等效。
你正在创建一个“热”可观察的,所以即使没有订户,它也会发光。
使用newobservable()
创建可观察对象时,您可以选择返回所谓的teardown(或dispose)函数,该函数应在需要时清理任何资源。
因此,在您的情况下,您需要停止计时器。
new Observable(subscriber => {
...
const handler = setTimeout(() => {...});
return () => clearTimeout(handler);
});
或者,如果您有多个计时器,您可以对所有计时器调用clearTimeout
。
问题内容: 我一直在阅读Observer模式,以保持UI处于最新状态,但仍然看不到它的用途。即使在我的特定对象中通知了我的MainActivity然后运行update();方法我仍然无法使用Pet对象来获取更新值,因为该对象是在Oncreate中创建的…而我只是无法创建新对象,因为那时变量会有所不同..这是我的实施,它似乎不起作用。 观察者/ MainActivity 可观察/宠物 问题答案: 首
是否有一种设计模式可以形成一个“复合”观察者/可观察者? 我的意思是我有一个可观察的,它在某个变化时通知它的监听器。 每个监听器也是一个可观察的,并通知它自己的监听器(在某个动作上,它做了哪个动作是由第一个可观察的通知触发的)。 这种观察者/可观察的“链接”作为设计是可以的,还是有一个标准的模式?
我试图理解当我使用 在或之后,在我使用时返回true 我知道是一次性的。isDisposed()返回false。有人能解释一下到底发生了什么吗?。我理解一个写得很好的观察。create不能在onComplete()或onError()之后发出项。
我正在学习RxJS,对于“听众”在哪里(在可观察的或观察者中),他们是如何订阅/取消订阅的,以及当观察者“不再对”可观察的“不感兴趣”时会发生什么,比如当你使用或。 对于第一部分——什么是订阅什么,什么是倾听者——我对这些陈述之间看似矛盾的地方感到困惑。从http://reactivex.io/rxjs/manual/overview.html我们读到观察者不是观察者的“听众” 这与addEven
我必须从两个订阅服务器获取数据,但我总是获取第一个订阅服务器的数据。 我有一个数据共享服务: 在离开搜索组件之前,我调用update方法。 现在,我在results组件上。我得到的共享数据如下: 我的问题是:我需要共享数据来订阅另一个可观察的数据。首先,我构造了一个物体乘坐,在我调用搜索方法之后 问题是我总是从数据服务获取数据,而不是从api调用。api工作导致我在存储中拦截结果,而不是在组件中。
我已经阅读了ReactiveX留档几次,仍然无法完全理解当观察者订阅可观察文件时会发生什么。 我们来看一个简单的例子: StackBlitz代码。 我的问题: 传递给可观察对象的