我在负责同步的类中使用PublishSubject。当同步完成时,将通知所有订阅服务器。发生错误时也会发生同样的情况。我注意到,下次我在发生错误后订阅时,它会立即返回给订阅者。
因此类可能如下所示:
public class Synchronizer {
private final PublishSubject<Result> mSyncHeadObservable = PublishSubject.create();
private final ThreadPoolExecutor mExecutor = new ThreadPoolExecutor(1, 1,
10, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(true),
new ThreadPoolExecutor.DiscardPolicy());
public Observable<Result> syncHead(final int chunkSize) {
mExecutor.execute(new Runnable() {
@Override
public void run() {
try {
//Do some work which either returns a result or throws an error
//...
mSyncHeadObservable.onNext(Notification.createOnNext(/*some result*/));
} catch (Throwable error) {
mSyncHeadObservable.onError(Notification.<Result>createOnError(error));
}
}
});
是否有一个可观察的东西可以作为代理?可能是其他Rx方法?
更新:我遵循了@Akarnokd方法,发出包装到RxJava通知
中的事件。然后通过flatmap()
展开它们。因此synchronizer
类的客户端不需要执行此操作。
//...
private PublishSubject<Notification<Result>> mSyncHeadObservable = PublishSubject.create();
public Observable<Result> syncHead(final int chunkSize) {
return mSyncHeadObservable.flatMap(new Func1<Notification<Result>, Observable<Result>>() {
@Override
public Observable<Result> call(Notification<Result> result) {
if (result.isOnError()) {
return Observable.error(result.getThrowable());
}
return Observable.just(result.getValue());
}
}).doOnSubscribe(
new Action0() {
@Override
public void call() {
startHeadSync(chunkSize);
}
});
}
private void startHeadSync(final int chunkSize) {
mExecutor.execute(new Runnable() {
@Override
public void run() {
try {
//Do some work which either returns a result or throws an error
//...
mSyncHeadObservable.onNext(Notification.createOnNext(/*some result*/));
} catch (Throwable error) {
mSyncHeadObservable.onError(Notification.<Result>createOnError(error));
}
}
});
}
//...
我不确定您希望通过这种设置实现什么,但通常,为了避免PublishSubject
出现终端条件,您应该将值和错误封装到一个公共结构中,并始终发出这些值和错误,永远不要发出任何onerror
和onCompleted
。一种选择是使用RXJava自己的事件包装器通知
,您的订阅服务器
应该解压缩该值。
我有两个演员,父母和孩子。家长使用内容监视孩子。看(孩子)。如果孩子调用上下文。停止(自我)父级收到终止消息。但是,如果子级抛出异常,akka将重新启动该异常,但不会向父级发送终止消息。 家长参与者如何监视孩子并监视任何终止/重新启动? 我提出的一个选项是覆盖父级中的主管策略,以便在出现任何异常时停止: 据我所知,这将适用于这位演员的所有孩子。理想情况下,我希望对个别孩子(演员类型)有一个不同的监
我已经阅读了ReactiveX留档几次,仍然无法完全理解当观察者订阅可观察文件时会发生什么。 我们来看一个简单的例子: StackBlitz代码。 我的问题: 传递给可观察对象的
我的API对两个独立的服务进行大约100次下游调用。在我将回复返回给客户之前,所有回复都需要汇总。我使用hystrix-feign进行HTTP调用。 我提出了一个我认为是优雅的解决方案,直到在rxJava文档中我发现了以下内容 BlockingObservable是提供阻塞运算符的各种可观察对象。它可以用于测试和演示目的,但通常不适用于生产应用程序(如果您认为需要使用BlockingObserva
让我们考虑下面的示例代码: 在函数gude()中,将创建一个新的observable,它将发出哈希值,该哈希值的前n个前导值设置为零。一个观察者订阅了那个可观察的,并立即取消订阅。让我们假设函数createHashWithNLeadingZeroes()需要相当长的时间来生成响应。 我想这里发生了以下事情: (1) 创建了一个新的可观察对象,描述可观察对象行为的函数被内部存储在属性_subscri
问题内容: 我一直在阅读Observer模式,以保持UI处于最新状态,但仍然看不到它的用途。即使在我的特定对象中通知了我的MainActivity然后运行update();方法我仍然无法使用Pet对象来获取更新值,因为该对象是在Oncreate中创建的…而我只是无法创建新对象,因为那时变量会有所不同..这是我的实施,它似乎不起作用。 观察者/ MainActivity 可观察/宠物 问题答案: 首
我想使用以下代码实现文件下载: RestAPI: 服务: 组件: 当我启动angular时,我得到错误:src/app/panel/service/download中的错误。服务ts(17,91):错误TS2339:类型“Observable”上不存在属性“map”。 将导入代码的正确wya是什么?当我点击下载按钮时,什么也没发生。