当前位置: 首页 > 知识库问答 >
问题:

RxJava:如何从可观察到的

呼延升
2023-03-14

我正在从事一个涉及Hystrix的项目,我决定使用RxJava。现在,忘记Hystrix的其余部分,因为我相信主要问题是我完全搞砸了正确编写可观察代码。

需要:我需要一种方法来返回一个代表多个可观察对象的可观察对象,每个可观察对象都运行一个用户任务。我希望该可观察对象能够返回任务的所有结果,甚至错误。

问题:可观测流会因错误而消亡。如果我有三个任务,而第二个任务引发了一个异常,那么即使第三个任务成功了,我也不会收到它。

我的代码:

public <T> Observable<T> observeManagedAsync(String groupName,List<EspTask<T>> tasks) {
    return Observable
            .from(tasks)
            .flatMap(task -> {
                try {
                    return new MyCommand(task.getTaskId(),groupName,task).toObservable().subscribeOn(this.schedulerFactory.get(groupName));
                } catch(Exception ex) {
                    return Observable.error(ex);
                }
            });
}

考虑到MyCommand是一个扩展HystrixObservableCommand的类,它返回一个可观察的,因此不应该涉及我所看到的问题。

尝试1:

使用Observable.flatMap如上

  • 好:每个命令都调度在自己的线程上,任务异步运行。
  • 坏:在第一个命令异常时,可观察完成发出先前的成功结果并发出异常。任何飞行中的命令都将被忽略。

尝试2:

使用<代码>可观察。concatMapDelayError,而不是flatMap

  • 坏:由于某种原因,任务同步运行。为什么??
  • 好:我得到了所有成功的结果。
  • ~好:OnError获取一个复合异常,其中包含抛出的异常列表。

任何帮助都会非常感激,而且可能会因为我自己没有想到而感到非常尴尬。

附加代码

此测试成功,可观察到<代码>。flatMap,但在使用Observable时失败。concatMapDelayError因为任务不是异步运行的:

java.lang.AssertionError:执行时间超过350ms限制:608

@Test
public void shouldRunManagedAsyncTasksConcurrently() throws Exception {
    Observable<String> testObserver = executor.observeManagedAsync("asyncThreadPool",getTimedTasks()); 
    TestSubscriber<String> testSubscriber = new TestSubscriber<>();
    long startTime = System.currentTimeMillis();
    testObserver.doOnError(throwable -> {
        System.out.println("error: " + throwable.getMessage());
    }).subscribe(testSubscriber);
    System.out.println("Test execution time: "+(System.currentTimeMillis()-startTime));
    testSubscriber.awaitTerminalEvent();
    long execTime = (System.currentTimeMillis()-startTime);
    System.out.println("Test execution time: "+execTime);
    testSubscriber.assertCompleted();
    System.out.println("Errors: "+testSubscriber.getOnErrorEvents());
    System.out.println("Results: "+testSubscriber.getOnNextEvents());
    testSubscriber.assertNoErrors();
    assertTrue("Execution time ran under the 300ms limit: "+execTime,execTime>=300);
    assertTrue("Execution time ran over the 350ms limit: "+execTime,execTime<=350);
    testSubscriber.assertValueCount(3);
    assertThat(testSubscriber.getOnNextEvents(),containsInAnyOrder("hello","wait","world"));
    verify(this.mockSchedulerFactory, times(3)).get("asyncThreadPool");
}

上述单元测试的任务:

protected List<EspTask<String>> getTimedTasks() {
    EspTask longTask = new EspTask("helloTask") {
        @Override
        public Object doCall() throws Exception {
            Thread.currentThread().sleep(100);
            return "hello";
        }
    };
    EspTask longerTask = new EspTask("waitTask") {
        @Override
        public Object doCall() throws Exception {
            Thread.currentThread().sleep(150);
            return "wait";
        }

    };
    EspTask longestTask = new EspTask("worldTask") {
        @Override
        public Object doCall() throws Exception {
            Thread.currentThread().sleep(300);
            return "world";
        }
    };
    return Arrays.asList(longTask, longerTask, longestTask);
}

共有3个答案

萧成文
2023-03-14

使用.料化()允许所有排放和错误作为包装通知通过,然后按照您的意愿处理它们:

     .flatMap(task -> {
            try {
                return new MyCommand(task.getTaskId(),groupName,task)
                    .toObservable()
                    .subscribeOn(this.schedulerFactory.get(groupName))
                    .materialize();
            } catch(Exception ex) {
                return Observable.error(ex).materialize();
            }
        });
魏凡
2023-03-14

您想使用mergeDelayError:

public <T> Observable<T> observeManagedAsync(String groupName,List<EspTask<T>> tasks) {
    return Observable.mergeDelayError(Observable
        .from(tasks)
        .map(task -> {
            try {
                return new MyCommand(task.getTaskId(),groupName,task).toObservable().subscribeOn(this.schedulerFactory.get(groupName));
            } catch(Exception ex) {
                return Observable.error(ex);
            }
        }));
}

请注意,MyCommand构造函数不应引发任何异常;这样可以更简洁地编写代码:

public <T> Observable<T> observeManagedAsync(String groupName,List<EspTask<T>> tasks) {
    return from(tasks)
           .map(task -> new MyCommand(task.getTaskId(), groupName, task)
                        .toObservable()
                        .subscribeOn(this.schedulerFactory.get(groupName)))
           .compose(Observable::mergeDelayError);

}

请记住,这最多只能调用一次onError;如果需要显式处理所有错误,请使用类似于

连文栋
2023-03-14

您可以使用Observable.on错误返回(),并返回特殊值(例如null),然后过滤下游的非特殊值。请记住,源可观察将在错误时完成。也取决于用例Observable.onErrorResumeNext()方法也很有用。如果您对错误通知感兴趣,请使用Observable.materialize(),这将将项目和onError()on完整()转换为通知,然后可以由Notification.getKind()过滤

编辑应在<代码>之后立即添加上述所有运算符。toObservable()。subscribeOn(this.schedulerFactory.get(groupName)) 假设没有尝试/捕获。

 类似资料:
  • 我正在尝试开发我的第一个RxJava例子 我有一个带有文本框和三个按钮的主要活动。第一个按钮初始化单独类中的整数。第二个按钮订阅一个可观察量,该可观察量假定正在观察整数。第三个按钮将整数的值减小 1。 这是我的密码 和班级 当我尝试使用 订阅时,它只是给了我 的值(即 6),然后它给了我完成! 然后我尝试使用,认为我需要使用,只是而不是,但后来我得到了一个返回的空的,然后再次完成! 有人能帮助我从

  • 问题内容: 给定汽车清单(),我可以这样做: 有没有办法我可以从一个到一个序列? 像没有参数的东西 问题答案: 您可以这样映射到: 请注意,flatMapping可能不会保留源可观察的顺序。如果订单对您很重要,请使用。

  • 我目前在Android和Kotlin上使用RxJava,但我有一个问题,如果不使用toBlocking(),我无法解决。 我在员工服务中有一个方法,它返回一个可观察的 这一切都很好,因为每当员工发生变化时,这个可观察对象就会发出新的员工列表。但是我想从员工那里生成一个PDF文件,这显然不需要每次员工更改时都运行。另外,我想从PDF生成器方法返回一个可完成的对象。我想在PDF中添加一个标题,然后遍历

  • 在android 6.0.1 Samsung s6 Edge+上的测试 当device screen脱机并从debug中拔出时,可观察到的只是停止发射项目。如果设备打开,则开始发射对象。另一个问题是,在停止接收项目之前,我会按照相同项目的顺序随机地得到2/3个重复调用 ____________________________edit_________________________________

  • 我有两个可观察到的。它们都是可观察的类型 一种是冷的,称为初始值可观察(initialValueObservable),它通过可观察(Observable)从项目列表中发出。from()。 另一个是名为“valueUpdateObservable”的热门主题,它是一个发布主题,在出现新项目时通知订阅者。 在客户端中,我想同时订阅这两个,因此我从和发布的更新中获取初始值。我最初的方法是合并它们,但我

  • 我正在尝试创建一个RxJava BlockingObservable,它将每隔X毫秒发出一个变量的值,直到(条件==true)或超时发生。 下面的代码似乎与我想要的很接近,但它总是发出一次,然后退出。奇怪的是,我在中有一个永远不会正确的条件——我希望这个可观察到的持续发出并最终超时,但事实并非如此。 我错过了什么/做错了什么?