当前位置: 首页 > 知识库问答 >
问题:

如何将Observable与Observable相结合?

沈淇
2023-03-14

我正在学习使用RxAndroid库的RxJava,同时使用改型来进行联网,并使用RetroLambda来使用Java8 lambdas。

我希望构建的应用程序具有以下功能:

  • 允许用户键入对Wikipedia API的查询

我让它像这样工作:

// emit when text is changed
Observable<OnTextChangeEvent> textStream = WidgetObservable.text(mEditText);
Observable<OnTextChangeEvent> debouncedStream = textStream.debounce(1, TimeUnit.SECONDS); // Unchecked assignment

// start activity indicator immediately
textStream
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(s -> mProgressBar.setVisibility(View.VISIBLE));

debouncedStream
        .map(t -> wikiService.search(t.text().toString())) // query wikipedia
        .map(Object::toString)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(s -> {
            mTextView.setText(s == null ? "Error" : s);
            mProgressBar.setVisibility(View.GONE);
        });

现在,我想添加一个新的小部件,允许我的维基百科查询使用另一种语言。现在,我将选择一个开关,以“en”或“nl”作为维基百科url的前缀。

所以我从开关中创建了一个新的可观察对象,它发出oncheckedchangevent。

我的想法是,我需要将这种可观察性与文本流结合起来。

当开关翻转时,应该运行基本相同的功能,但不是完全相同。当前正在运行的查询(如果正在运行)将过时,因为url前缀将更改。它应该再等待1秒钟,然后开始新的网络呼叫。

显然,以下方法不起作用:

// emit when Switch is flipped
Observable<OnCheckedChangeEvent> languageSwitchStream = WidgetObservable.input(mLanguageSwitch);

// emit when text is changed
Observable<OnTextChangeEvent> textStream = WidgetObservable.text(mEditText);

// combine these 2, but they are using different types
Observable uiChangeStream = Observable.merge(textStream, languageSwitchStream);

Observable<OnTextChangeEvent> debouncedStream = uiChangeStream.debounce(1, TimeUnit.SECONDS); // Unchecked assignment

我不能只是合并text StreamLanguage SwitchStream

所以问题变成了:我应该如何处理这个问题,使用正确的Rx?

==解决方案==========================

// emit when Switch is flipped
Observable<OnCheckedChangeEvent> languageSwitchStream =
        WidgetObservable
                .input(mLanguageSwitch)
                .startWith(new OnCheckedChangeEvent() {
                    @Override
                    public CompoundButton view() {
                        return null;
                    }

                    @Override
                    public boolean value() {
                        return mLanguageSwitch.isChecked();
                    }
                });

// emit when text is changed
Observable<OnTextChangeEvent> textStream = WidgetObservable.text(mEditText);

Observable<OnTextChangeEvent> uiChangeStream = Observable
        .combineLatest(
                textStream,
                languageSwitchStream,
                (text, switchValue) -> text);

uiChangeStream
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(s -> mProgressBar.setVisibility(View.VISIBLE));

uiChangeStream
        .debounce(1, TimeUnit.SECONDS)
        .map(t -> wikiService.search(t.text().toString())) // query wikipedia
        .map(Object::toString)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(s -> {
            mTextView.setText(s == null ? "Error" : s);
            mProgressBar.setVisibility(View.GONE);
        });

扩展批准的答案,我添加了一个. startsBy()到Switch的可观察,否则它将继续等待它被翻转,然后再发出一个值。

共有1个答案

步德宇
2023-03-14

使用CombineTest并执行以下操作:

Observable<SearchParams> uiChangeStream = Observable.combineLatest(
    textStream, 
    languageSwitchStream,
    (text, switch) -> /* extract info from each and return search params */)
.map(searchParams -> wikiService.search(searchParams);

组合Latest()将在一个可观察对象发出新值时重新发出另一个可观察对象的最后一个值。

 类似资料:
  • YzObservables 就是 Promise 的超集

  • Observable.of(1, 2, 3).subscribe( doSomething, reject, resolve); }); The forEach pattern is useful for a sequence of events you only expect to happen once. export class MyApp { private d

  • 我想我根本不理解角2的rxjs可观测物的概念。 我创建了一个可观察属性,其他人可以订阅该属性并获取http调用结果的数据。 如果我想发出另一个http请求,可能需要修改一些参数以提取一些新数据,该怎么办?我是否要用这个新的可观察物替换可观察物?这是否破坏了我的订阅组件与前一个可观察组件的订阅?

  • 另一方面,还有一个热,这更像是一个现场表演。 你从一开始就参加一个现场乐队表演,但有些人可能迟到25分钟的节目。 乐队不会从头开始播放,后来者必须从那里开始观看表演。 在RxJS API中的方法是publish方法。 此方法接受一个冷Observable作为其来源,并返回一个ConnectableObservable的实例。 在这种情况下,我们必须显式调用connect在我们的热Observabl