当前位置: 首页 > 知识库问答 >
问题:

如何使用Mono和state实现重复

笪涛
2023-03-14

我有一个方法,它获取一个项目列表,发出一个web请求,然后返回处理失败的项目(有些项目可能已经正确处理,有些项目可能已经失败,只返回失败的项目或一个空列表):

// returns list of of items that failed processing or empty list.
private List<String> sendsItems(List<String> items);

我想使用Mono并尝试发送失败的项目最多N次。使用阻塞代码,它看起来像这样:

public void sendWithRetries() {
    List<String> items = List.of("one", "two", "three");
    int retryNo = 0;
    while (!items.isEmpty() && retryNo < 3) {
        items = sendsItems(items);
        retryNo++;
    }
}

我真的很难用反应器将其转换为代码。我一直在考虑使用Mono.repeat或Mono.retry系列函数,但我没有看到任何将失败的项目传递回sendItems的好方法,除了做一些丑陋的事情,比如:

    var items = new AtomicReference<>(List.of("one", "two", "three"));
    Mono.fromCallable(() -> {
                List<String> failedItems = sendsItems(items.get());
                items.set(failedItems);
                if (!failedItems.isEmpty()) {
                    throw new RuntimeException("retry me");
                }
                return Collections.emptyList();
            })
            .retry(3)

有更好的方法吗?

注意:我需要它的真正用例是kinesis put records api call。它返回有关哪些记录未得到处理的信息,我想重试仅发送它们。

共有1个答案

明星剑
2023-03-14

您可以使用扩展运算符来维护状态:

Mono.fromCallable(() -> sendsItems(new Attempt(List.of("one", "two", "three"), 0)))
    .expand(attempt -> {
        if (attempt.getItems().isEmpty() || attempt.getRetry() > 3) {
            return Mono.empty();
        } else {
            return Mono.fromCallable(() -> sendsItems(attempt));
        }
    });

private Attempt sendsItems(Attempt previousAttempt) {
    System.out.println(previousAttempt);
    // implement actual sending logic here, below is just some dummy
    List<String> failedItems = previousAttempt.getItems().subList(0, previousAttempt.getItems().size() - 1);
    return new Attempt(failedItems, previousAttempt.retry + 1);
}

@Value
static class Attempt {
    List<String> items;
    int retry;
}
 类似资料:
  • State 使用的流程 在JustAuth中state参数的使用流程如下: 获取authorizeUrl时创建state(开发者创建,如果不创建则系统默认生成) 缓存state(JustAuth执行) 内置的缓存调度器自动清除已过期的state(JustAuth执行) 创建state(开发者) state在OAuth授权流程中是一个非必要但很重要的参数,就如名词解释中描述的:state是用来保持授

  • 问题内容: 我很快将使用Java的log4j类来创建项目。但是我不认为我对此有任何了解。希望有人能启发我这个小问题。 问题答案: Log4j基本上接受您要输出的任何语句,让您为其分配“严重性”级别(警告,错误,严重等),并以多种方式将其注销。(对于文件,字节流等,有很多附加选项可用。) 这是对log4j的简短介绍。 http://www.developer.com/open/article.php

  • 问题内容: 每当抛出javascript异常时,我们还想做一些额外的事情。 从以下文档: 角度表达式中任何未捕获的异常都委托给此服务。默认的实现只是将$ log.error委托给浏览器控制台。 它说“默认实现”的事实使我认为有一种方法可以为服务提供我们自己的实现,并在引发异常时做我们想要的事情。我的问题是,你如何做到这一点?我们如何使所有异常都保留给该服务,然后提供我们希望发生的功能? 问题答案:

  • 问题内容: 在Hibernate中应如何实现模型类的equals和hashcode?有哪些常见的陷阱?默认实现在大多数情况下是否足够好?使用商务钥匙有什么意义吗? 在我看来,要考虑到延迟获取,ID生成,代理等,在每种情况下都无法正确工作。 问题答案: Hibernate有何时/如何重写一个很好的和长期的描述/ 在文档 要点是,如果您的实体将成为的一部分,或者您要分离/附加其实例,则只需担心它。后者

  • 我有一个无功/异步调用的场景。我正在使用并使用进行外部HTTP调用。我的场景是,我必须打电话给call A(),然后检查它的响应响应。如果它的响应是确定比退出并返回响应。否则,创建第二个请求请求B使用响应A和打一个调用到calB()。然后检查其响应响应。如果它是确定的,然后返回响应,否则doRetry on call A()。

  • 我想尝试使用SIMD指令编写一个atoi实现,以包含在RapidJSON(一个C JSON阅读器/写入器库)中。它目前在其他地方进行了一些SSE2和SSE4.2优化。 如果是速度增益,可以并行完成多个结果。这些字符串最初来自JSON数据的缓冲区,因此多原子函数必须执行任何所需的滑动。 我提出的算法如下: 我可以用以下方式初始化长度为N的向量:[10^N..10^1] 我将缓冲区中的每个字符转换为一