当前位置: 首页 > 知识库问答 >
问题:

定期接收可观测发射值

吕冠宇
2023-03-14

我必须定期轮询一些RESTfulendpoint以刷新android应用程序的数据。我还必须根据连接情况暂停并恢复(如果手机处于脱机状态,甚至无需尝试)。我当前的解决方案正在运行,但它使用标准Java的ScheduledExecutorService来执行定期任务,但我想继续使用Rx范式。

这是我当前的代码,为了简洁起见,跳过了部分代码。

userProfileObservable = Observable.create(new Observable.OnSubscribe<UserProfile>() {
    @Override
    public void call(final Subscriber<? super UserProfile> subscriber) {
        final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        final Runnable runnable = new Runnable() {
            @Override
            public void run() {
                // making http request here
            }
        };
        final List<ScheduledFuture<?>> futures = new ArrayList<ScheduledFuture<?>>(1);
        networkStatusObservable.subscribe(new Action1<Boolean>() {
            @Override
            public void call(Boolean networkAvailable) {
                if (!networkAvailable) {
                    pause();
                } else {
                    pause();                        
                    futures.add(scheduledExecutorService.scheduleWithFixedDelay(runnable, 0, SECOND_IN_MILLIS * SECONDS_TO_EXPIRE, TimeUnit.MILLISECONDS));
                }
            }

            private void pause() {
                for (ScheduledFuture<?> future : futures) {
                    future.cancel(true);
                }
                futures.clear();
            }
        });

        final Subscription subscription = new Subscription() {
            private boolean isUnsubscribed = false;

            @Override
            public void unsubscribe() {
                scheduledExecutorService.shutdownNow();
                isUnsubscribed = true;
            }

            @Override
            public boolean isUnsubscribed() {
                return isUnsubscribed;
            }
        };
        subscriber.add(subscription);
    }
}).multicast(BehaviorSubject.create()).refCount();

网络状态可观察基本上是一个包裹在可观察中的广播接收器

正如我所说,这个解决方案是有效的,但我想使用Rx方法进行定期轮询并发出新的UserProfiles,因为手动调度有许多问题,我想避免。我知道Observable.timerObservable.interval,但不知道如何将它们应用于此任务(我不确定是否需要使用它们)。

共有3个答案

卫俊誉
2023-03-14

简短的回答。RxJava2:

Observable.interval(initialDelay, unitAmount, timeUnit)
            .subscribe(value -> {
                // code for periodic execution
            });

根据需要选择initialDelay、unitAmount和TimeUnit。

示例:0,1, TimeUnit. MINUTES。

钱言
2023-03-14

选择之一是使用Observable。间隔并在发出间隔时检查用户状态:

     Observable<Long> interval = Observable.interval(1, TimeUnit.SECONDS);

    //pulling the user data
    Observable<Observable<String>> userObservable = interval.map(new Func1<Long, Observable<String>>() {
        Random random = new Random();
        @Override
        public Observable<String> call(Long tick) {
            //here you are pulling user data; you should do it asynchronously - rx way - because the interval is using Schedulers.computation which is not best suited for doing io operations
            switch(random.nextInt(10)){
                case 0://suppose this is for cases when network in  not available or exception happens
                    return Observable.<String>just(null);
                case 1:
                case 2:
                    return Observable.just("Alice");
                default:
                    return Observable.just("Bob");
            }
        }
    });

    Observable<String> flatUsers = userObservable.flatMap(new Func1<Observable<String>, Observable<? extends String>>() {
        @Override
        public Observable<? extends String> call(Observable<String> stringObservable) {
            return stringObservable;
        }
    });

    //filter valid data
    Observable<String> usersWithoutErrors = flatUsers.filter(new Func1<String, Boolean>() {
        @Override
        public Boolean call(String s) {
            return s != null;
        }
    });

    //publish only changes
    Observable<String> uniqueUsers = usersWithoutErrors.distinctUntilChanged();

如果networkStatusObservable发出事件的频率至少与检查用户数据所需的频率相同,则可以更简单地执行此操作

 networkStatusObservable.sample(1,TimeUnit.Seconds).filter(/*the best is to filter only connected state */).map(/*now start pulling the user data*/)

最后,您可以创建使用调度程序定期发出用户状态的可观察-请参阅调度程序留档以了解哪个调度程序最适合您的需求:

public abstract class ScheduledOnSubscribe<T> implements Observable.OnSubscribe<T>{
    private final Scheduler scheduler;
    private final long initialDelay;
    private final long period;
    private final TimeUnit unit;

    public ScheduledOnSubscribe(Scheduler scheduler, long initialDelay, long period, TimeUnit unit) {
        this.scheduler = scheduler;
        this.initialDelay = initialDelay;
        this.period = period;
        this.unit = unit;
    }

    abstract T next() throws Exception;


    @Override
    public void call(final Subscriber<? super T> subscriber) {
        final Scheduler.Worker worker = scheduler.createWorker();
        subscriber.add(worker);
        worker.schedulePeriodically(new Action0() {
            @Override
            public void call() {
                try {
                    subscriber.onNext(next());
                } catch (Throwable e) {
                    try {
                        subscriber.onError(e);
                    } finally {
                        worker.unsubscribe();
                    }
                }
            }

        }, initialDelay, period, unit);
    }
}

//And here is the sample usage
 Observable<String> usersObservable = Observable.create(new ScheduledOnSubscribe(Schedulers.io(), 1, 1, TimeUnit.SECONDS ){
        Random random = new Random();
        @Override
        String next() throws Exception {
            //if you use Schedulers.io, you can call the remote service synchronously
            switch(random.nextInt(10)){
                case 0:
                    return null;
                case 1:
                case 2:
                    return "Alice";
                default:
                    return "Bob";
            }
        }
    });
齐才艺
2023-03-14

关于这个GitHub问题,您可能会发现一些有用的方法。

https://github.com/ReactiveX/RxJava/issues/448

这三个实现是:

Observable.interval

Observable.interval(delay, TimeUnit.SECONDS).timeInterval()
        .flatMap(new Func1<Long, Observable<Notification<AppState>>>() {
            public Observable<Notification<AppState>> call(Long seconds) {
                return lyftApi.updateAppState(params).materialize(); } });

调度程序。定期安排

Observable.create({ observer ->
        Schedulers.newThread().schedulePeriodically({
            observer.onNext("application-state-from-network");
        }, 0, 1000, TimeUnit.MILLISECONDS);
    }).take(10).subscribe({ v -> println(v) });

手动递归

Observable.create(new OnSubscribeFunc<String>() {
        @Override
        public Subscription onSubscribe(final Observer<? super String> o) {
            return Schedulers.newThread().schedule(0L, new Func2<Scheduler, Long, Subscription>() {
                @Override
                public Subscription call(Scheduler inner, Long t2) {
                    o.onNext("data-from-polling");
                    return inner.schedule(t2, this, 1000, TimeUnit.MILLISECONDS);
                }
            });
        }
    }).toBlockingObservable().forEach(new Action1<String>() {
        @Override
        public void call(String v) {
            System.out.println("output: " + v);
        }
    });

结论是手动递归是可行的,因为它会等到操作完成后再安排下一次执行。

 类似资料:
  • 我想计算单个可观测物体的排放量之间的差异。 如果一个可观测的物体发出一个新的值,有没有办法同时得到它当前和以前的发射?我想要这样的东西: 我该怎么做呢?我是否需要将每个发射存储在一个上限变量中,或者是否有一种奇特的RxJS方法来实现这一点?

  • 我有一个简单的api请求,它向一个对象返回一个可观察的对象,该对象中有一组项,每个项中都有一个链接。因此,我想在当前异步流中直接获取链接后面的数据,但我收到一个CORS错误,错误是: 这是否可能与我当前的函数?还是我误解了一些基本概念?

  • 我有两个源观测值,当一个源观测值发出时,我需要计算一些数据。我试图使用操作符,但它只在每个源观测值第一次发出时发出一个值。 是否有任何运算符类似于,一旦任何源可观测对象第一次发出,就立即发出?如果没有,最清晰的方法是什么? 我所尝试的:

  • 使用rxjsv6 所以我有多个我想要跟踪/观察的变量,当它们中的任何一个发生变化时,触发一个可观察对象来调用一个API,该API然后根据这些变量更新其他内容。 我想我可能会在上触发一个事件来强制可观察对象做一些事情,但它不会根据我想要的返回结果 例如。 然而,在可观测数据上没有订阅方法。我试图修改我的另一个观察值(这是可行的)

  • 我试图创建一个observate,它从firebase查询返回一个列表。问题是,当我调用onNext发出项目,然后调用onComplete时,它会停止发出第一个项目之后的项目,而根本不调用onComplete不会发出任何东西。有没有正确的方法来实现我想要实现的目标?我对RxJava还是很陌生,请原谅我的无知。感谢您的帮助:)

  • 问题内容: 我想同步地发出两个Observable对象(它们是异步的),一个接一个地返回第 一个 发出的Observable对象。如果第一个失败,则不应发出第二个。 假设我们有一个登录用户的Observable,还有一个登录 后 自动选择用户帐户的Observable 。 这是我尝试的: 不幸的是,这不适用于我的用例。它将以“ ob1”开始并行发出/调用两个可观察对象。 有人遇到过类似的用例吗?还