当前位置: 首页 > 知识库问答 >
问题:

rxjava:我可以使用retry()吗?

于高雅
2023-03-14

我所做的是:在Observable.onSubscribe的call()方法中,在调用Subscribers的onError()方法之前,我只是让线程Hibernate所需的时间。因此,为了每1000毫秒重试一次,我执行如下操作:

@Override
public void call(Subscriber<? super List<ProductNode>> subscriber) {
    try {
        Log.d(TAG, "trying to load all products with pid: " + pid);
        subscriber.onNext(productClient.getProductNodesForParentId(pid));
        subscriber.onCompleted();
    } catch (Exception e) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e.printStackTrace();
        }
        subscriber.onError(e);
    }
}

因为这个方法是在IO线程上运行的,所以它不会阻塞UI。我看到的唯一问题是,即使第一个错误也是用延迟报告的,所以即使没有retry(),延迟也是存在的。如果延迟不是在错误之后应用,而是在重试之前应用(但显然不是在第一次尝试之前),我会更好。

共有1个答案

陆畅
2023-03-14

您可以使用retrywhen()运算符将重试逻辑添加到任何可观察到的内容。

下面的类包含重试逻辑:

public class RetryWithDelay implements Function<Observable<? extends Throwable>, Observable<?>> {
    private final int maxRetries;
    private final int retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Observable<?> apply(final Observable<? extends Throwable> attempts) {
        return attempts
                .flatMap(new Function<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> apply(final Throwable throwable) {
                        if (++retryCount < maxRetries) {
                            // When this Observable calls onNext, the original
                            // Observable will be retried (i.e. re-subscribed).
                            return Observable.timer(retryDelayMillis,
                                    TimeUnit.MILLISECONDS);
                        }

                        // Max retries hit. Just pass the error along.
                        return Observable.error(throwable);
                    }
                });
    }
}
public class RetryWithDelay implements
        Func1<Observable<? extends Throwable>, Observable<?>> {

    private final int maxRetries;
    private final int retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Observable<?> call(Observable<? extends Throwable> attempts) {
        return attempts
                .flatMap(new Func1<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> call(Throwable throwable) {
                        if (++retryCount < maxRetries) {
                            // When this Observable calls onNext, the original
                            // Observable will be retried (i.e. re-subscribed).
                            return Observable.timer(retryDelayMillis,
                                    TimeUnit.MILLISECONDS);
                        }

                        // Max retries hit. Just pass the error along.
                        return Observable.error(throwable);
                    }
                });
    }
}

用法:

// Add retry logic to existing observable.
// Retry max of 3 times with a delay of 2 seconds.
observable
    .retryWhen(new RetryWithDelay(3, 2000));
 类似资料:
  • 根据这个问题,将创建无限数量的线程。在我的应用程序中,这是一个问题,因为我有数百个异步任务要完成。 注释中的建议是使用,这是合理的,但使用模式与不同: 使用,我可以在应用程序中重用相同的线程池,而Rx将为我正确调用。 使用,我必须使在应用程序中可用,并记住调用。 问题: 我可以调整的行为来使用有界线程池吗? 将贯穿到Rx应用程序,并确保正确执行的推荐方法是什么?

  • 我有一个遗留项目,在多个上下文中有很多bean。bean之间似乎存在循环依赖关系,这就是为什么大多数上下文都将默认的lazy init设置为true。 我需要通过RMI导出一些bean,因此我在上下文中有以下声明: 这两个bean都声明为不懒惰。我认为,至少有一个声明为非懒惰足以具有相同的行为。在这种配置中有一个很大的缺点。它不起作用。我在上下文创建过程中有异常 组织。springframewor

  • 问题内容: 为了在Web上实时传输数据,我计划使用Redis作为我的Cache数据层,其中数据是瞬时的。Celery是队列管理器,RabbitMQ是从Redis排队进入Tornado层的代理。然后,该层通过websockets流到前端。 我从未在网上找到Redis + RabbitMQ组合。有人可以为它提供可靠的解决方案指南。问题是这样的整合是否可能和明智的? 问题答案: 我现在非常成功地一起使用

  • 问题内容: 有什么方法可以将Socket.IO http://socket.io/与Django 一起使用? 问题答案: 你当然可以! Django本身并不异步,因此你必须与普通的django服务器并行使用Socket.IO服务器,node.js始终不是一个不错的选择,但也存在使用纯Python编写的其他服务器。

  • 我将使用PWA和React开发一个跨平台的移动应用程序,由于我对这些技术不熟悉,我想知道什么是使其具有响应性的最佳和最简单的方法。 我知道可以使用媒体查询。 https://developers.google.com/web/fundamentals/design-and-ux/responsive/ 但与Bootstrap相比,似乎还有更多的工作要做,但不幸的是,我找不到任何指导我在同一项目中使

  • 根据Mozilla的说法,iOS上没有任何浏览器支持跨源开放策略,这是启用ShareDarrayBuffer的安全要求之一。这是否意味着我不能将SharedArrayBuffer用于iOS平台上的任何Web应用程序?

  • 我有一个实体,它有一个包含更多字段的可选子实体。子实体同样有一个元素集合。 这在Hibernate中通常是不可能的,还是我遗漏了什么? 错误消息为:

  • 我正在使用BeautifulSoup刮取一个URL,并使用以下代码查找标记,其类为: 现在,在上面的代码中,我们可以使用获取标记和与它们相关的信息,但我想使用XPath。是否可以将XPath与BeautifulSoup一起使用?如果可能,请给我提供示例代码。