我想知道如何转换类似于SwitchMap的可观察,但不是限制为单个活动流有多个(有限的)流。
其目的是让多个任务同时工作,达到某些任务计数限制,并允许新任务使用FIFO队列策略启动,这意味着任何新任务到达时都将立即启动,队列中最旧的任务将被取消。
switchMap将为源的每个发射创建可观察流,并将取消以前运行的可观察流。一旦创建了新的可观察流,我想实现类似的功能,但允许在某个级别(如flatMap)上进行并发,这意味着允许为每个发射创建可观察流的数量,并在达到并发限制时并发运行到某个并发限制,最旧的可观察对象将被取消,新的将开始。
实际上,这也类似于使用maxConcurrent的flatMap,但是当到达maxConcurrent时,不是新的观察值在队列中等待,而是取消旧的观察值并立即输入新的观察值。
虽然现成的解决方案不可用,但下面这样的东西应该会有所帮助。
public static void main(String[] args) {
Observable.create(subscriber -> {
for (int i = 0; i < 5; i++) {
Observable.timer(i, TimeUnit.SECONDS).toBlocking().subscribe();
subscriber.onNext(i);
}
})
.switchMap(
n -> {
System.out.println("Main task emitted event - " + n);
return Observable.interval(1, TimeUnit.SECONDS).take((int) n * 3)
.doOnUnsubscribe(() -> System.out.println("Unsubscribed for main task event - "+ n));
}).subscribe(n2 -> System.out.println("\t" + n2));
Observable.timer(20, TimeUnit.SECONDS).toBlocking().subscribe();
}
Observable.create
部分创建了一个缓慢的生产者,它以发射0、睡眠1s和发射1、睡眠2s和发射2等方式发射项目。
为每秒发出数字的每个元素创建可观察的对象。您还可以注意到,每当一个元素由main发出时,以及当它被取消订阅时,它都会打印一行。
因此,在您的情况下,您可能有兴趣使用dounsubscribe关闭最旧的任务。希望有帮助。
下面的伪代码可能有助于更好地理解。
getTaskObservable()
.switchMap(
task -> {
System.out.println("Main task emitted event - " + task);
return Observable.create(subscriber -> {
initiateTaskAndNotify(task, subscriber);
}).doOnUnsubscribe(() -> checkAndKillIfMaxConcurrentTasksReached(task));
}).subscribe(value -> System.out.println("Done with task and got output" + value));
您可以尝试使用此变压器:
public static <T, R> Observable.Transformer<T, R> switchFlatMap(
int n, Func1<T, Observable<R>> mapper) {
return f ->
Observable.defer(() -> {
final AtomicInteger ingress = new AtomicInteger();
final Subject<Integer, Integer> cancel =
PublishSubject.<Integer>create().toSerialized();
return f.flatMap(v -> {
int id = ingress.getAndIncrement();
Observable<R> o = mapper.call(v)
.takeUntil(cancel.filter(e -> e == id + n));
cancel.onNext(id);
return o;
});
})
;
}
演示:
public static void main(String[] args) {
PublishSubject<Integer> ps = PublishSubject.create();
@SuppressWarnings("unchecked")
PublishSubject<Integer>[] pss = new PublishSubject[3];
for (int i = 0; i < pss.length; i++) {
pss[i] = PublishSubject.create();
}
AssertableSubscriber<Integer> ts = ps
.compose(switchFlatMap(2, v -> pss[v]))
.test();
ps.onNext(0);
ps.onNext(1);
pss[0].onNext(1);
pss[0].onNext(2);
pss[0].onNext(3);
pss[1].onNext(10);
pss[1].onNext(11);
pss[1].onNext(12);
ps.onNext(2);
pss[0].onNext(4);
pss[2].onNext(20);
pss[2].onNext(21);
pss[2].onNext(22);
pss[1].onCompleted();
pss[2].onCompleted();
ps.onCompleted();
ts.assertResult(1, 2, 3, 10, 11, 12, 20, 21, 22);
}
我想在laravel中创建一个RBAC系统,其中一个用户可以属于多个角色,每个角色可以有多个权限。中间件应在继续请求之前检查用户是否具有特定权限(在其任何角色内)。 我能够实施一个案例,其中 用户属于一个具有多个权限的角色 我需要实现具有多个权限的多个角色的用户。有什么指点吗?
switchmap的rxjava文档定义相当模糊,它链接到与FlatMap相同的页面。这两个操作员有什么不同?
问题内容: 我的Android应用有很多按钮。 我的main.xml布局具有三个按钮。 我知道如何使用按钮从一个活动转到另一个活动,但是我不知道如何在一个活动上具有多个按钮,每个按钮启动的活动与另一个活动不同。 例 Main.xml Button1 Button2 Main2.xml 由button1启动 About.xml 由Button2启动 我如何使main.java文件做到这一点? 问题答
我已经创建了一个类似上面的类,我希望能够使用相同的类使用'colour'作为这个类的替代,如下面。 有没有一种方法可以简单地创造出来?
我正在寻找一种将多个订阅者附加到RxJava可观察流的方法,每个订阅者异步处理发出的事件。 我第一次尝试使用。flatMap(),但这似乎对任何后续订阅服务器都不起作用。所有订阅服务器都在同一线程上处理事件。 最终工作的是通过每次创建一个新的可观察的来消耗新线程中的每个事件: 输出: 以及多个订阅者的最终结果: 输出: 然而,这似乎有点笨拙。有没有更优雅的解决方案,或者RxJava不是一个很好的用