当前位置: 首页 > 知识库问答 >
问题:

平面映射中引发的异常被onErrorResume运算符忽略

蒙墨竹
2023-03-14

考虑以下代码

@Slf4j
@ExtendWith(MockitoExtension.class)
class ConnectionEventsConsumerTest {

    @Test
    public void testOnErrorResume() {
        Flux.range(0, 5)
                .doOnNext(event -> log.info("Processing -  {}", event))
                .flatMap(event -> processEvent(event)
                        .doOnSuccess(result -> log.info("Processed - {}", event))
                        .onErrorResume(t -> handleError(t, event))
                )
                .doOnError(t -> log.error("Exception propagated", t))
                //.log()
                .then()
                .subscribe();
    }

    private Mono<Void> processEvent(Object object) {
        return Mono.error(() -> new RuntimeException("test"));
        //throw new RuntimeException("test");
    }
    
    private Mono<Void> handleError(Throwable throwable, Object object) {
        log.error("Processing Failed - {}", object);
        
        return Mono.empty();
    }
    
}

如果方法Process Event返回Mono.error则输出与抛出Exception完全不同。

代码是这样的(返回一个Mono.error),我看到了我所期望的,300次处理和过程迭代失败,我看到没有异常传播。

17: 33:19.853[主]信息通信。playtika。服务。卡特福斯。pvp。服务Kafka。联系连接事件消费测试

  • 处理-0 17:33:19.864[主]错误com。playtika。服务。卡特福斯。pvp。服务Kafka。联系连接事件消费测试
  • 处理失败-0 17:33:19.865[主]信息com。playtika。服务。卡特福斯。pvp。服务Kafka。联系连接事件消费测试
  • 处理-1 17:33:19.866[主]错误com。playtika。服务。卡特福斯。pvp。服务Kafka。联系连接事件消费测试
  • 处理失败-1 17:33:19.866[主]信息com。playtika。服务。卡特福斯。pvp。服务Kafka。联系连接事件消费测试
  • 处理-2 17:33:19.866[主]错误com。playtika。服务。卡特福斯。pvp。服务Kafka。联系连接事件消费测试
  • 处理失败-2 17:33:19.866[主]信息com。playtika。服务。卡特福斯。pvp。服务Kafka。联系连接事件消费测试
  • 处理-3 17:33:19.866[主]错误com。playtika。服务。卡特福斯。pvp。服务Kafka。联系连接事件消费测试
  • 处理失败-3 17:33:19.866[主]信息com。playtika。服务。卡特福斯。pvp。服务Kafka。联系连接事件消费测试
  • 处理-4 17:33:19.866[主]错误com。playtika。服务。卡特福斯。pvp。服务Kafka。联系连接事件消费测试
  • 处理失败-4

另一方面,如果我取消对抛出的注释,我会看到正在处理的通量中的单个项,我看不到来自handleError的消息,我会看到“传播的异常”

17:35:53.950[main]信息com.playtika.services.catforce.pvp.service.kafka.connection.连接事件消费者测试

  • 处理- 0 17:35:53.968[main]ERRORcom.playtika.services.catforce.pvp.service.kafka.connection.ConnectionEvents消费者测试
  • 异常传播java.lang.RuntimeException: test

如果这是设计的,那么平面图的最佳实践是什么?想到的一个简单的解决方案是用try-catch将平面图的th内容包围起来,将异常封装在Mono中。错误虽然它可以工作,但它不雅观,太手工,很可能会被遗忘。

共有1个答案

韦衡
2023-03-14

创建/返回Mono的方法不应该以这种方式抛出异常。由于异常是在组装(创建)Mono之前抛出的,因此平面图中的后续运算符不可能生效,因为它们需要Mono来操作。

如果您无法控制processEvent()方法来修复其行为,那么可以用Mono将其包装起来。延迟,这将确保即使在装配期间出现的错误也会通过平面图内的单声道传播:

Flux.range(0, 5)
    .doOnNext(event -> log.info("Processing -  {}", event))
    .flatMap(event -> Mono.defer(() -> processEvent(event))
                .doOnSuccess(result -> log.info("Processed - {}", event))
                .onErrorResume(t -> handleError(t, event)))
    .doOnError(t -> log.error("Exception propagated", t))


private Mono<Void> processEvent(Object object) {
    throw new RuntimeException("test");
}

请注意,在其他中间操作符(如map或doOnNext)中,您可以随意以丑陋的方式抛出异常,因为Reactor可以将它们转换为适当的错误信号,因为此时单声道已经在进行中。

 类似资料:
  • 我试图在map()中使用filter(),但我得到了这个火花异常: RDD转换和操作只能由驱动程序调用,不能在其他转换内部调用;例如,rdd1。地图(x)= 我知道火花不允许嵌套转换/动作/RDD,所以有人可以给我一个建议,如何替代它(没有嵌套转换或动作),我有一个RDD它的元组是这样的: 我试着映射它,给它一个列表作为参数,这个列表包含javaPairRDD这样的: 这些行指的是修改RDD()函

  • 首先,我对这个问题太长表示最诚挚的歉意,但老实说,我不知道如何缩短它,因为每个部分都是一个特例。诚然,我可能对此视而不见,因为我已经把头撞到墙上好几天了,我开始绝望了。 我向所有通读这本书的人表示最大的尊重和感谢。 我希望能够通过使用Jersey ExceptionMapers将Shiro的AuthenticationException及其子类映射到JAX-RS响应,Jersey例外映射器是使用G

  • 问题内容: 如何在RxJava2中将一个发生的异常映射到另一个异常?例如: 在这种情况下,我最终收到包含和的内容,但我只想收到。救命! 问题答案: 您可以使用onErrorResumeNext并从中返回Observable.error(): 编辑 该测试对我来说是合格的:

  • 我正在努力使用Java Spring Hibernate,我正在尝试实现Oauth2,在通过@ManyToMany将表用户连接到角色时,我不断遇到错误。我已经阅读了所有关于我的问题的答案,无论我尝试什么,我仍然得到了一个组织。冬眠映射异常。 以下是我正在努力做的事情的全部细节。 数据库结构 角色。JAVA 使用者JAVA 依赖性 问题: org.springframework.beans.fact

  • 我在让很多操作员工作时遇到了一些麻烦 作业说明如下 本任务的目的是处理例外情况。您可能还记得,我为您提供了一个名为FlashDrive的示例类,如下图所示。您可以在此处获取FlashDrive类的源(.NET或.NET 2010)。我希望您增强该类,以便调用其方法或运算符可能引发异常,而不仅仅是将错误消息打印到cout。目前,我们最喜欢的异常类是std::logic\u error。您可以通过向其

  • java.util.concurrent.executionException:java.lang.ClassCastException:com.hazelCast.mapreduce.aggregation.impl.DistrictValuesAggregation$SimpleEntry不能在com.hazelCast.mapreduce.impl.task.trackableJobFutu