当前位置: 首页 > 知识库问答 >
问题:

Retor onError继续运算符是否让原始序列继续?

曹铭晨
2023-03-14

反应器错误处理留档(https://projectreactor.io/docs/core/3.4.10/reference/index.html#error.handling)声明错误处理运算符不允许原始序列继续。

在学习错误处理操作符之前,必须记住,反应序列中的任何错误都是终端事件。即使使用了错误处理操作符,也不会让原始序列继续。相反,它将onError信号转换为新序列(回退序列)的开始。换言之,它将替换其上游终止的序列。

但是onError继续的javadoc声明如下(https://projectreactor.io/docs/core/3.4.10/api/index.html)-

通过从序列中删除有罪的元素并继续后续元素,让上游的兼容操作符从错误中恢复。

onErrorContinue是否不被视为“错误处理操作员”?

它似乎允许原始序列继续-

        Flux.range(1, 5)
                .map(i -> {
                    if (i == 3) {
                        throw new RuntimeException("Forcing exception for " + i);
                    }
                    return i;
                })
                .doOnNext(i -> System.out.println(i))
                .onErrorContinue((throwable, o) -> System.err.println("Error while processing " + o + " - " + throwable.getMessage()))
                .subscribe();

结果(删除了3个,但继续使用后续元素)

1
2
4
5
Error while processing 3 - Forcing exception for 3

Process finished with exit code 0

文档中确实指出OneErrorContinue依赖于操作员支持。有没有其他方法可以让原始序列(源通量)继续对所有操作符都有效?我不希望在出现错误的情况下使用备用通量来替换源通量(OneErrorResume行为)-我只想忽略问题元素

编辑1(我的用例)

我有一个ReactorKafka源通量

        KafkaReceiver.create(receiverOptions)
                .receive()
                .flatMap(record -> processRequest(record.value())
                        .doOnNext(e -> record.receiverOffset().acknowledge())
                        .doOnError(e -> {
                            System.err.println("Error occurred for msg: " + record.value() + ", Error " + e);
                            record.receiverOffset().acknowledge();
                        })
                        .onErrorResume(e -> Mono.empty()))
                .repeat(() -> true)
                .retryWhen(Retry.indefinitely())
                .doFinally(signalType -> {
                    //dont expect control to ever reach here
                    System.err.println("KafkaReceiverFlux terminating with Signal type: " + signalType);
                })
                .subscribe();

共有1个答案

费凯康
2023-03-14

反应器遵循的反应流规范规定流中的所有错误都是终端事件——这就是反应器错误处理留档所建立的。为了处理错误,必须发生错误,并且根据规范,该错误必须是终端。在所有符合规范的情况下(几乎是所有情况下),这是正确的。

然而,onErrorContinue()是一种非常特殊的运算符。它是一个错误处理操作符,但它允许删除错误并继续流,从而打破了被动规范。在您希望使用错误端通道进行连续处理、永不停止的情况下,它可能非常有用。

也就是说,它有一系列的问题——不仅需要特定的操作员支持(因为完全符合反应流规范的操作员可能会完全忽略onErrorContinue(),同时仍然保持兼容),还存在一系列其他问题。如果你对背景知识感兴趣,我们中的一些人会在这里讨论这些。将来可能会转移到不安全的分组或类似的分组,但这是一个很难解决的问题。

话虽如此,核心建议是目前Javadoc中的建议,除了非常具体的情况外,不要在所有情况下都使用OneRorContinue(),而是在每个单独的发布者上使用OneRorResume():

//Stream
.flatMap(id -> repository.retrieveById(id)
      .doOnError(System.err::println)
      .onErrorResume(e -> Mono.empty()))

这引入了更详细的内容,可能会带来较小的性能损失(我没有验证过),但其优点是其行为更加清晰,不会违反反应流规范,也不需要特定的操作员支持才能工作。这是我在几乎所有情况下的建议-我个人觉得在大多数情况下,onErrorContinue()的微妙之处太复杂了,无法推理。

 类似资料:
  • 我知道我过去解决过这个问题,但今天似乎做不到。我写了一个非常简单的程序,它产生20只海龟,让它们随机移动。我点击我的设置,它们就出现了。然后我点击去,我得到一个勾。我必须一遍又一遍地点击去才能移动。 当我单击一次go时,如何让它继续运行? 在“界面”选项卡上,我设置了“查看更新”选项。

  • 跳过当前循环的剩余部分并继续下一次循环。在各种循环中都是有效的。 Continue [, LoopLabel] [AHK_L 59+]:如果指定了,则 LoopLabel 表示此语句所应用的循环;通过标签名或嵌套层级的数值。如果省略或为 1,此语句应用于它所在的最内层循环。LoopLabel 必须为常量,不支持变量和表达式。如果指定标签,则它必须直接指向循环命令。 Continue 的行为如同直接

  • 简介 本章介绍的是Scheme中特有的数据类型——继续(Continuation)。由于其他程序设计语言并没有这种数据类型,因此它难于理解。当下,你并不需要彻底理解清楚,只需要大致了解。 我会讲解广义的继续和简短地介绍Continuation-Passing-Style(CPS),然后再讲解Scheme中的继续。我认为通过这种方式理解继续会比较容易。 广义继续 继续是在返回到顶层(Top leve

  • 我制作了一个程序,要求用户输入5位数字,然后程序将查找这些数字的总和。我想知道我怎样才能使程序在计算一次之后一遍又一遍地要求一个数字。我希望用户再试一次,直到他自己想退出。

  • 当我试图在从Panel1类的下一个JButton触发action事件时调用panel2类的JPanel2时,我得到了NullPointerException。对此如何化解?plzz帮助。 我的最后一个问题JPanel的Nullpointerexception已经被你们成功解决了。请在这方面提供帮助。这个例外正在吞噬我的脑袋。

  • 问题内容: 我知道上面的脚本不起作用。因此,如果需要将带有break的函数或继续放入循环,该如何写? 问题答案: 一个函数不能导致中断或继续调用它的代码。中断/继续实际上必须出现在循环内。您的选择是: 从funcA返回一个值并使用它来决定是否中断 在funcA中引发异常并将其捕获在调用代码中(或调用链中更高的位置) 写一个生成器来封装中断逻辑,然后在生成器上进行迭代 通过#3我的意思是这样的: 这