我想创建一个可观察的,将按需发射项目,这意味着我想要一个单一的订阅可观察的,并通知可观察的,我需要基于我的请求的新项目。
这就是我使用Publishsubject所做的:
public class RecognizeSubject {
PublishSubject<Bitmap> mSubject;
private Context mContext;
private FaceDetector mFaceDetecor;
public RecognizeSubject(Context mContext) {
this.mContext = mContext;
this.mSubject = PublishSubject.create();
}
public void detect(Bitmap btm){
mSubject.onNext(btm);
}
public Flowable<SinglePhotoId> execute() {
return mSubject.toFlowable(BackpressureStrategy.DROP)
.observeOn(Schedulers.newThread())
.map(bitmap1 -> recognize(bitmap1))
.observeOn(AndroidSchedulers.mainThread())
.doOnSubscribe(disposable -> initialize())
.doFinally(() -> release());
}
private void initialize() {
mFaceDetecor = new FaceDetector.Builder(mContext)
.setTrackingEnabled(false)
.setLandmarkType(FaceDetector.ALL_LANDMARKS)
.build();
}
private void release() {
if (mFaceDetecor != null)
mFaceDetecor.release();
}
private SinglePhotoId recognize(Bitmap bitmap) {
//SystemClock.sleep(3000);
//make hard background work and return SinglePhotoId object
}
}
下面是活动类中的用法:
private void takeSubjectSnap() {
if (mSubject == null)
mSubject = new RecognizeSubject(getBaseContext());
if (mDisposable == null || mDisposable.isDisposed()) {
mDisposable = mSubject.execute()
.subscribe(this::handleDetectionSuccess,
this::handleDetectionError,
this::handleDetectionCompleted);
}
mSnapshotButton.setProgress(true);
mSubject.detect(myVideoView.getBitmap());
}
所以基本上我订阅了Flowable对象,并将Bitmap对象传递给我的Subject类,通过Flowable继续并返回结果,这个解决方案是正确的还是会产生一些内存泄漏?
是否有更好的解决方案将对象发送到Observable,以便通过标准onNext()方法继续并返回结果?
好的,我已经阅读了你的答案,并对我的代码做了一些修改,下面是一个图表,说明了我想要实现的目标:
因此,我的Subject类将在后台线程上处理接收到的数据,并通过onNext()方法将处理过的项目发送给观察者。我制作了一个简单的主题,它接收Integer对象并将其转换为String对象,下面是代码:
public class MySubject {
private String TAG = "MySubject";
private PublishSubject<Integer> subject;
private final Observable<String> observable;
public MySubject() {
Log.d(TAG, "---> MySubject() called");
this.subject = PublishSubject.create();
this.observable = subject
.doOnSubscribe(disposable -> init())
.doFinally(() -> relese()) //try do after terminate
.observeOn(Schedulers.newThread())
.map(this::myMap)
.observeOn(AndroidSchedulers.mainThread());
}
private void init(){
Log.d(TAG, "---> init() called");
}
private void relese(){
Log.d(TAG, "---> relese() called");
}
private String myMap(Integer integer){
Log.d(TAG, "---> myMap() called int: " + integer);
SystemClock.sleep(3000);
return " :) " + String.valueOf(integer);
}
public void decode(Integer integer){
subject.onNext(integer);
}
public Observable<String> getObservable(){
return observable;
}
}
下面是活动类中的一个用法:
Disposable disposable = Disposables.disposed();
MySubject subject = new MySubject();
void onButton1() {
if(disposable.isDisposed()){
disposable = subject.getObservable()
.subscribe(s -> {
Log.d(TAG, "---> onNext() called " + s);
}, throwable -> {
Log.d(TAG, "---> onError() called " + throwable.getMessage());
}, () -> {
Log.d(TAG, "---> onCompleted() called ");
});
}
Random generator = new Random();
int i = generator.nextInt(100) + 1;
subject.decode(i);
}
每次调用onButton1()方法时,我都会向主题对象发布新的随机int,然后在完成后通过onNext()方法接收处理过的数据。
该解决方案是否正确,并且不会导致任何副作用或内存泄漏?当然,我在onStop()活动方法中取消了subject的订阅。也许有更好的解决方案来处理Rxjava中的此类问题?
如有任何进一步的答复,将不胜感激:)
(我将此作为评论,因为这不是答案,但太长了)
我真的不确定您在这里的用例是什么,也不确定您所描述和实现的东西到底想要实现什么。
您所描述的是一种机制,它能够处理某些内容,并在空闲时请求新内容/能够处理/等等
您实现的是一种基于推送的处理器根据从客户端接收的数据处理项目的机制。
因此,如果您实现的东西按照您的要求工作,这很好,但我建议做一些小的更改:
execute
方法重命名为其他方法(因为它不执行任何操作)一次性。disposed()
以避免空检查和
- 我想将
RecognizeSubject
重命名为其他名称,因为现在它正在泄漏有关其内部实现的信息
- 我会把
mSubject
变成private
和变成final
- 我要去掉匈牙利符号
我不确定在使用位图时,flowable是否合适,你确定你一次需要那么多位图并处理所有位图(并删除未处理的位图吗?)。
RX继电器在这种情况下非常方便
https://github.com/JakeWharton/RxRelay
我试图创建一个observate,它从firebase查询返回一个列表。问题是,当我调用onNext发出项目,然后调用onComplete时,它会停止发出第一个项目之后的项目,而根本不调用onComplete不会发出任何东西。有没有正确的方法来实现我想要实现的目标?我对RxJava还是很陌生,请原谅我的无知。感谢您的帮助:)
让我们考虑下面的示例代码: 在函数gude()中,将创建一个新的observable,它将发出哈希值,该哈希值的前n个前导值设置为零。一个观察者订阅了那个可观察的,并立即取消订阅。让我们假设函数createHashWithNLeadingZeroes()需要相当长的时间来生成响应。 我想这里发生了以下事情: (1) 创建了一个新的可观察对象,描述可观察对象行为的函数被内部存储在属性_subscri
问题内容: 我一直在阅读Observer模式,以保持UI处于最新状态,但仍然看不到它的用途。即使在我的特定对象中通知了我的MainActivity然后运行update();方法我仍然无法使用Pet对象来获取更新值,因为该对象是在Oncreate中创建的…而我只是无法创建新对象,因为那时变量会有所不同..这是我的实施,它似乎不起作用。 观察者/ MainActivity 可观察/宠物 问题答案: 首
我试图理解当我使用 在或之后,在我使用时返回true 我知道是一次性的。isDisposed()返回false。有人能解释一下到底发生了什么吗?。我理解一个写得很好的观察。create不能在onComplete()或onError()之后发出项。
问题内容: 给定汽车清单(),我可以这样做: 有没有办法我可以从一个到一个序列? 像没有参数的东西 问题答案: 您可以这样映射到: 请注意,flatMapping可能不会保留源可观察的顺序。如果订单对您很重要,请使用。
我正在尝试创建一个RxJava BlockingObservable,它将每隔X毫秒发出一个变量的值,直到(条件==true)或超时发生。 下面的代码似乎与我想要的很接近,但它总是发出一次,然后退出。奇怪的是,我在中有一个永远不会正确的条件——我希望这个可观察到的持续发出并最终超时,但事实并非如此。 我错过了什么/做错了什么?