我已经阅读了ReactiveX留档几次,仍然无法完全理解当观察者订阅可观察文件时会发生什么。
我们来看一个简单的例子:
import { Observable } from 'rxjs';
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.complete();
});
const observer = {
next: (x) => console.log('got value ' + x),
error: (err) => console.error('something wrong occurred: ' + err),
complete: () => console.log('done')
};
observable.subscribe(observer);
StackBlitz代码。
我的问题:
传递给可观察对象的订户
对象来自哪里?
从RxJS留档:
可以观察到并不是巧合。subscribe
和subscribe
在newobservable中(函数subscribe(subscriber){…})
具有相同的名称。在图书馆,它们是不同的,但为了实用目的,你可以在概念上平等地考虑它们。
因此,显然传递到可观察构造函数(
subscriber
)中的subscribe回调的对象实际上不是observer
对象。至少如果你按照上面关于库实际工作方式的引述去做的话。
如果传入的不是
observer
对象,那么subscriber到底是什么。下一步(1)
和订阅。complete()
调用?如何连接到观察者
中的下一个
属性?
澄清编辑:
我知道如何利用RxJS,事实上,人们可以从概念上想象观察者被注入(正如报价所说)。然而,我在这里是想了解它实际上是如何工作的。
匿名用户
不,观察者不会被注入到可观察对象中。
混乱源于这样一个事实,即新的可观察(...)语法更像是一个低级工厂,而不是一个有用的模式。
它或多或少是由更直接的实现所使用的机制,如of(value e1, value e2,..., value eN)
,from(枚举)
和from mEvent(...)
。
这些方法是您应该关注的实际用例。
在幕后,所有这些方法都将某种同步或异步价值或交互连接到可观测流的奇妙世界中。为了做到这一点,它们以某种方式表现得像一个合适的观察者:它们生成项目并将其放入流中。为此,他们使用一个名为next
的函数。就像Observer
实现中的方法一样,bacause实际上是以完全相同的方式调用的。
特别是,您可以在此处查看subscribe方法的实现:
https://github.com/ReactiveX/rxjs/blob/master/src/internal/Observable.ts
如果您想了解订阅期间实际发生的情况,我建议您实际查看代码。但是,在我看来,您应该在熟悉了各种可观察的创建函数之后再尝试。
希望能有帮助。
可观察的
创建流程如下:
作者定义了一个可观察的
(为了便于解释,这里使用new
手动定义):
const myObservable = new Observable(function subscribe(subscriber) {
subscriber.next(1);
subscriber.next(2);
subscriber.complete();
return function tearDownLogic() {
console.log('runs when Observable for whatever reason is done (complete, error, or unsubscribed)')
}
});
传递给上述可观察构造函数的subscribe
回调由可观察构造函数本地保存:
constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic) {
if (subscribe) {
this._subscribe = subscribe;
}
}
因此,我们有一个完整的subscribe
函数,由我们或任何其他预制的Observable
定义,保存下来供以后执行。
可以通过几种形式之一将观察者传递给subscribe
回调。或者,直接作为一到三个函数(next、error、complete),或者作为一个对象使用三种相同方法中的一种或多种。出于本说明的目的,我们将实现最后一个更详细的选项:
const observer = {
next(v) {
console.log(v);
}
error(err) {
console.log(err);
}
complete() {
console.log('Observable has now completed and can no longer emit values to observer');
}
}
现在,有趣的部分开始了。我们将观察者
传递给可观察者。订阅(……)
方法:
myObserver.subscribe(observer);
subscribe方法如下所示:
subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void),
error?: (error: any) => void,
complete?: () => void): Subscription {
const { operator } = this;
const sink = toSubscriber(observerOrNext, error, complete);
if (operator) {
sink.add(operator.call(sink, this.source));
} else {
sink.add(
this.source || (config.useDeprecatedSynchronousErrorHandling && !sink.syncErrorThrowable) ?
this._subscribe(sink) :
this._trySubscribe(sink)
);
}
if (config.useDeprecatedSynchronousErrorHandling) {
if (sink.syncErrorThrowable) {
sink.syncErrorThrowable = false;
if (sink.syncErrorThrown) {
throw sink.syncErrorValue;
}
}
}
return sink;
}
简要介绍了
subscribe
方法:
以先前讨论过的形式之一接收观测者
toSubscriber
将观察者转换为Subscriber
对象,而不管其传入形式如何(Subscriber
实例保存在sink
变量中)运算符
变量是未定义的
,除非您订阅了运算符。因此,只需忽略operator
if
语句即可
Subscriber
扩展了Subscription
对象(原型链接到该对象),该对象的原型上有两个重要方法:unsubscribe()
,add()
添加(…)
用于将“分解逻辑”(函数)添加到可观察的
,该逻辑将在可观察的
完成或取消订阅时运行。它将接受传递给它的任何函数,将其包装在Subscription
对象中,并将该函数放入Subscription
的\u unsubscribe
变量中。此订阅
保存在我们在上面创建的订户
上,保存在名为\u subscriptions
的变量中。如前所述,我们这样做是为了在订户取消订阅或完成订阅时,执行所有add()
'ed下拉逻辑
作为旁注,可观察到。subscribe()
返回订户实例。因此,您可以调用mySubscriber。在它的任何位置添加(//一些分解逻辑)
,以添加当可观察的
完成或取消订阅时将执行的函数
一个重要的部分现在包含了:this_trySubscribe(sink)
运行(作为参数在add()中)<代码>\u尝试订阅(..)
是实际运行subscribe
回调的函数,该回调先前由Observable
构造函数保存。重要的是,它将sink
(我们新的Subscriber
实例)作为回调传递给Observable
回调。换句话说,当<代码>订户。下一步(1)
在可观察的执行中,我们实际上是在接收器中执行next(1)
(Subscriber
)实例(next()
在Subscriber
的原型上)
所以,现在我就到此为止。在
toSubscribe
内部以及关于取消订阅流程的其他内容中有更多详细信息,但这些内容不在本Q的范围之内
简言之,为了回答标题中的问题,只需将观察者转换为统一的
订户
对象,即可将其传递到可观察对象
。
希望这能在将来帮助其他人。
我正在学习RxJS,对于“听众”在哪里(在可观察的或观察者中),他们是如何订阅/取消订阅的,以及当观察者“不再对”可观察的“不感兴趣”时会发生什么,比如当你使用或。 对于第一部分——什么是订阅什么,什么是倾听者——我对这些陈述之间看似矛盾的地方感到困惑。从http://reactivex.io/rxjs/manual/overview.html我们读到观察者不是观察者的“听众” 这与addEven
问题内容: 我一直在阅读Observer模式,以保持UI处于最新状态,但仍然看不到它的用途。即使在我的特定对象中通知了我的MainActivity然后运行update();方法我仍然无法使用Pet对象来获取更新值,因为该对象是在Oncreate中创建的…而我只是无法创建新对象,因为那时变量会有所不同..这是我的实施,它似乎不起作用。 观察者/ MainActivity 可观察/宠物 问题答案: 首
是否有一种设计模式可以形成一个“复合”观察者/可观察者? 我的意思是我有一个可观察的,它在某个变化时通知它的监听器。 每个监听器也是一个可观察的,并通知它自己的监听器(在某个动作上,它做了哪个动作是由第一个可观察的通知触发的)。 这种观察者/可观察的“链接”作为设计是可以的,还是有一个标准的模式?
我试图理解当我使用 在或之后,在我使用时返回true 我知道是一次性的。isDisposed()返回false。有人能解释一下到底发生了什么吗?。我理解一个写得很好的观察。create不能在onComplete()或onError()之后发出项。
让我们考虑下面的示例代码: 在函数gude()中,将创建一个新的observable,它将发出哈希值,该哈希值的前n个前导值设置为零。一个观察者订阅了那个可观察的,并立即取消订阅。让我们假设函数createHashWithNLeadingZeroes()需要相当长的时间来生成响应。 我想这里发生了以下事情: (1) 创建了一个新的可观察对象,描述可观察对象行为的函数被内部存储在属性_subscri
我来自同步编程背景,我很难理解可观察性。 这是我的服务/提供商的摘录(离子2项目) 我将从订阅它。关于这一点,我有几个问题。 > 即使我没有声明,上面的代码是否返回一个可观察/观察者? 响应是JSON。如何检查/处理JSON并执行一些操作,如 那就做吧 我认为应该在提供者类中完成。只是一个典型的提示/例子将是真棒。 当请求到达subscribe方法时,它是否真的发生了? 创建和返回Angular