当前位置: 首页 > 知识库问答 >
问题:

在Mono对象上执行块()时获得异常,我从ReactiveMongoRepository对象返回

宗政燕七
2023-03-14

我有一个服务,它将数据流到第二个服务,该服务接收对象流并将其保存到我的MongoDB中。在我从流服务中获得的Flux对象上的订阅函数中,我使用ReactiveMongoRepository接口中的保存方法要使用块函数并获得数据,我得到以下错误:

2019-10-11 13:30:38.559  INFO 19584 --- [localhost:27017] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:1, serverValue:25}] to localhost:27017
2019-10-11 13:30:38.566  INFO 19584 --- [localhost:27017] org.mongodb.driver.cluster               : Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[4, 0, 1]}, minWireVersion=0, maxWireVersion=7, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=6218300}
2019-10-11 13:30:39.158  INFO 19584 --- [ctor-http-nio-4] quote-monitor-service                    : onNext(Quote(id=null, ticker=AAPL, price=164.8, instant=2019-10-11T10:30:38.800Z))
2019-10-11 13:30:39.411  INFO 19584 --- [ctor-http-nio-4] quote-monitor-service                    : cancel()
2019-10-11 13:30:39.429  INFO 19584 --- [ntLoopGroup-2-2] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:3, serverValue:26}] to localhost:27017
2019-10-11 13:30:39.437  WARN 19584 --- [ctor-http-nio-4] io.netty.util.ReferenceCountUtil         : Failed to release a message: DefaultHttpContent(data: PooledSlicedByteBuf(freed), decoderResult: success)

io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
    at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74) ~[netty-common-4.1.39.Final.jar:4.1.39.Final]
    at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138) ~[netty-common-4.1.39.Final.jar:4.1.39.Final]
    at 
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-4
Caused by: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-4
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:77) ~[reactor-core-3.2.12.RELEASE.jar:3.2.12.RELEASE]
    at reactor.core.publisher.Mono.block(Mono.java:1494) ~[reactor-core-3.2.12.RELEASE.jar:3.2.12.RELEASE]
    at

我的代码:

stockQuoteClient.getQuoteStream()
                .log("quote-monitor-service")
                .subscribe(quote -> {
                    Mono<Quote> savedQuote = quoteRepository.save(quote);
                    System.out.println("I saved a quote! Id: " +savedQuote.block().getId());
                });

经过一些挖掘,我设法让它工作,但我不明白为什么它现在工作。新守则:

stockQuoteClient.getQuoteStream()
                .log("quote-monitor-service")
                .subscribe(quote -> {
                       Mono<Quote> savedQuote = quoteRepository.insert(quote);
                       savedQuote.subscribe(result ->
                                 System.out.println("I saved a quote! Id :: " + result.getId()));
    });

block()的定义:无限期订阅此Mono和block,直到收到下一个信号。

subscribe()的定义:订阅此Mono并请求无限需求。

有人能帮我理解为什么区块不起作用,订阅起作用了吗?我错过了什么?

共有1个答案

浦德义
2023-03-14

阻塞是不好的,因为它会捆绑等待响应的线程。这在反应性框架中是非常糟糕的,它几乎没有线程可供使用,并且被设计成不应该不必要地阻塞它们。

这正是反应性框架设计要避免的事情,所以在这种情况下,它只会阻止你这样做:

block()/blockFirst()/blockLast()正在阻塞,这在线程reactor-http-nio-4中不受支持

相反,您的新代码是异步工作的。线程不会被阻塞,因为在存储库返回一个值之前不会发生任何实际情况(然后执行传递给SavedQuote.subscribe()的lambda,将结果打印到控制台。)

但是,从反应流的角度来看,新代码仍然不是最佳/正常的,因为您在subscribe方法中执行所有逻辑。通常要做的事情是对我们进行一系列flatMap/map调用,以转换流中的项目,并使用doOnNext()获得副作用(例如打印值):

stockQuoteClient.getQuoteStream()
            .log("quote-monitor-service")
            .flatMap(quoteRepository::insert)
            .doOnNext(result -> System.out.println("I saved a quote! Id :: " + result.getId())))
            .subscribe();

如果你对反应器/反应流做了大量的工作,一般来说值得一读。它们对于非阻塞工作非常强大,但它们确实需要一种与更“标准”的Java不同的思维方式(和编码)。

 类似资料:
  • 好吧,所以我有下面的方法,但我试图摆脱使用。块(),因此尝试实现Mono,但得到错误< code >所需的类型:List 原始代码 我尝试以非阻塞方式重构使用 Mono

  • 我是新的Spring WebFlux。我正在编写一个简单的api,它调用另一个api,并返回响应。我遇到的问题是我的api接受不同于外部api的请求类型。我必须将传入的请求转换为外部api。我使用Mono来接收对我的api的请求,但在没有block()的情况下转换为另一个对象时遇到了麻烦。 输入 有没有一种方法可以把单声道转换成Person对象而不阻塞?

  • 我想从Flux/Mono中获取对象。我使用 我会这样做: 我有错误: 为什么?有什么不同的方法来获取对象? 在反应式编程中,如何做到:在RequestBody中,您有UserDto。 如果不创建用户,请检查数据库中是否存在电子邮件。

  • 我使用了这个config允许将嵌套对象解析为json: 它似乎确实起到了作用,因为带有嵌套对象的工作正常。然后我尝试从json创建一个类实例https://pub.dev/packages/json_serializable: 我得到一个例外: 抛出的异常:类型'_InternalLinkedHashMap 这是的值: 红色覆盖的值是字符串。当和为时,没有错误。 我认为这是因为是嵌套的JsonSe

  • 原始关闭原因未解决 我的脚本正在打印作为的结果。 有人能解释一下如何使用打印from?

  • 我正在上传一个图像到Firebase存储,这个错误不断出现 E/StorageException:StorageException已发生。对象的位置不存在。代码:-13010 httpresult:404 E/StorageException:{“错误”:{“代码”:404,“消息”:“未找到。无法获取对象”, “状态”:“get_object”}}java.io.IoException:{“错误