学习了Observables之后,我发现它们与Node.js流非常相似。两者都有一种机制,可在新数据到达,发生错误或没有更多数据(EOF)时通知使用者。
我很想了解两者之间的概念/功能差异。谢谢!
无论 观测量 和node.js中的 流
让你解决同样的根本问题:异步处理值的序列。我认为,两者之间的主要区别与激发其外观的环境有关。该上下文反映在术语和API中。
在 Observables
方面,您对EcmaScript进行了扩展,引入了反应式编程模型。它试图填补值生成和异步之间的间隙用的极简和可组合的概念Observer
和Observable
。
在node.js和 Streams
方面,您想要创建一个接口,用于网络流和本地文件的异步处理和高性能处理。从初始上下文的术语派生,你会得到pipe
,chunk
,encoding
,flush
,Duplex
,Buffer
,等由于具有务实的做法,提供了特殊的用例,你失去了一些能力,撰写的东西,因为它不是为统一明确的支持。例如,您push
在Readable
流write
上使用,Writable
尽管从概念上讲,您在做相同的事情:发布值。
因此,在实践中,如果你看的概念,如果你使用的选项{ objectMode: true }
,可以匹配Observable
与Readable
流和Observer
与Writable
流。您甚至可以在两个模型之间创建一些简单的适配器。
var Readable = require('stream').Readable;
var Writable = require('stream').Writable;
var util = require('util');
var Observable = function(subscriber) {
this.subscribe = subscriber;
}
var Subscription = function(unsubscribe) {
this.unsubscribe = unsubscribe;
}
Observable.fromReadable = function(readable) {
return new Observable(function(observer) {
function nop() {};
var nextFn = observer.next ? observer.next.bind(observer) : nop;
var returnFn = observer.return ? observer.return.bind(observer) : nop;
var throwFn = observer.throw ? observer.throw.bind(observer) : nop;
readable.on('data', nextFn);
readable.on('end', returnFn);
readable.on('error', throwFn);
return new Subscription(function() {
readable.removeListener('data', nextFn);
readable.removeListener('end', returnFn);
readable.removeListener('error', throwFn);
});
});
}
var Observer = function(handlers) {
function nop() {};
this.next = handlers.next || nop;
this.return = handlers.return || nop;
this.throw = handlers.throw || nop;
}
Observer.fromWritable = function(writable, shouldEnd, throwFn) {
return new Observer({
next: writable.write.bind(writable),
return: shouldEnd ? writable.end.bind(writable) : function() {},
throw: throwFn
});
}
您可能已经注意到,我改变了一些名字和使用的简单的概念Observer
和Subscription
,介绍到这里,以避免做reponsibilities超负荷
观测量
在Generator
。基本上,Subscription
您可以取消订阅Observable
。无论如何,使用上述代码,您可以拥有一个pipe
。
Observable.fromReadable(process.stdin).subscribe(Observer.fromWritable(process.stdout));
与相比process.stdin.pipe(process.stdout)
,您拥有的是一种组合,过滤和转换流的方法,该方法也适用于任何其他数据序列。您可以使用Readable
,Transform
和Writable
流实现此功能,但API倾向于使用子类而不是Readable
s和应用函数。Observable
例如,在模型上,转换值对应于将转换器函数应用于流。不需要的新子类型Transform
。
Observable.just = function(/*... arguments*/) {
var values = arguments;
return new Observable(function(observer) {
[].forEach.call(values, function(value) {
observer.next(value);
});
observer.return();
return new Subscription(function() {});
});
};
Observable.prototype.transform = function(transformer) {
var source = this;
return new Observable(function(observer) {
return source.subscribe({
next: function(v) {
observer.next(transformer(v));
},
return: observer.return.bind(observer),
throw: observer.throw.bind(observer)
});
});
};
Observable.just(1, 2, 3, 4, 5).transform(JSON.stringify)
.subscribe(Observer.fromWritable(process.stdout))
结论?在Observable
任何地方都可以轻松引入反应模型和概念。围绕该概念实现整个库比较困难。所有这些小功能都需要一致地协同工作。毕竟,ReactiveX项目仍在进行中。但是,如果您确实需要将文件内容发送到客户端,进行编码并压缩,则可以在NodeJS中使用它的支持,并且效果很好。
问题内容: 我一直在寻找新的rx java 2,但我不确定我是否已经明白了这个主意… 我知道我们所拥有的并没有支持。 因此,基于例如,可以说我有有: 在大约128个值之后,这将崩溃,这很明显我消耗的速度比获取项目要慢。 但是,我们有相同的 即使我延迟使用它,它仍然完全不会崩溃。为了工作,可以说我放了一个运算符,崩溃已经消失了,但并不是所有值都被发出。 因此,我目前在脑海中找不到答案的基本问题是,为
我一直在看新的rx java 2,我不太确定我是否理解了< code >背压的概念... 我知道我们有没有支持的和有背压。 因此,基于示例,假设我有与: 这将在大约128个值之后崩溃,很明显,我的消费速度比获取物品要慢。 但是< code>Observable也是如此 这将不会崩溃,即使我把一些消费延迟,它仍然工作。为了使< code >可流动工作,假设我将< code>onBackpressur
问题是: 使用及其其他功能不是更好吗?
Observables 是多个值的惰性推送集合。它填补了下面表格中的空白: 单个值 多个值 拉取 Function Iterator 推送 Promise Observable 示例 - 当订阅下面代码中的 Observable 的时候会立即(同步地)推送值1、2、3,然后1秒后会推送值4,再然后是完成流: var observable = Rx.Observable.create(functio
问题内容: 我一直在阅读Observer模式,以保持UI处于最新状态,但仍然看不到它的用途。即使在我的特定对象中通知了我的MainActivity然后运行update();方法我仍然无法使用Pet对象来获取更新值,因为该对象是在Oncreate中创建的…而我只是无法创建新对象,因为那时变量会有所不同..这是我的实施,它似乎不起作用。 观察者/ MainActivity 可观察/宠物 问题答案: 首
在一个Android应用程序的开发过程中,我遇到了这个设计难题,我现在无法解决。我将感谢任何想法、变通方法或干净的解决方案:)我将尽可能地简化它: 一切都是从一个通用的观察者模式实现开始的。有一个主题在它的公共方法调用时改变状态;有观察者对这些变化作出相应的反应。在这种情况下,主题是一个有状态的主题,这意味着它有一个像自动机一样变化的内部状态控制器(一个int)。每次它的状态发生变化时,它都会像往