当前位置: 首页 > 知识库问答 >
问题:

如何不依赖Reactor订阅时间

琴宾鸿
2023-03-14

我已经阅读了整个反应堆的文件,但我无法找到正确的模式,以解决以下问题。我有一个方法可以异步地执行某些操作。我以Flux的形式返回结果响应,消费者可以订阅它。

该方法有以下定义:

Flux<ResultMessage> sendRequest(RequestMessage message);

返回的通量是一个热通量,结果可以在任何给定的时间异步来。

sendRequest(message).subscribe(response->doSomethinWithResponse(response);
Flux<ResultMessage> sendRequest(RequestMessage message) {
   Flux<ResultMessage> result = incomingMessageStream
            .filter( resultMessage -> Objects.equals( resultMessage.getId(), message.getId() ) )
            .take( 2 );
// The message sending is done here...

    return result;
}

其中,incomingMessageStream是通过此通道的所有消息的flux。这个实现的一个问题是,消费者是在结果消息到来之后订阅的,它可能会错过其中的一些消息。

所以,我正在寻找的是一个解决方案,将允许消费者不依赖于订阅的时间。潜在的消费者可能根本不需要订阅产生的flux。我正在寻找一个通用的解决方案,但如果这是不可能的,您可以假设结果消息的数量不大于2。

共有1个答案

凌宏大
2023-03-14

过了一段时间,我创建了一个似乎有效的解决方案:

Flux<ResultMessage> sendRequest(RequestMessage message) {
  final int maxResponsesCount = 2;
  final Duration responseTimeout = Duration.ofSeconds( 10 );
  final Duration subscriptionTimeout = Duration.ofSeconds( 5 );

  // (1) 
  ConnectableFlux<ResultMessage> result = incomingMessageStream
      .ofType( ResultMessage.class )
      .filter( resultMessage ->Objects.equals(resultMessage.getId(), message.getId() ) )
      .take( maxResponsesCount )
      .timeout( responseTimeout )
      .replay( maxResponsesCount );
  Disposable connectionDisposable = result.connect();

  // (2)
  AtomicReference<Subscription> subscriptionForCancelSubscription = new AtomicReference<>();
  Mono.delay( subscriptionTimeout )
    .doOnSubscribe( subscriptionForCancelSubscription::set )
    .subscribe( x -> connectionDisposable.dispose() );

  // The message sending is done here...

  // (3)
  return result
    .doOnSubscribe(s ->subscriptionForCancelSubscription.get().cancel())
    .doFinally( signalType -> connectionDisposable.dispose() );
}

我正在使用一个ConnectableFlux立即连接到流,而不需要订阅,它被设置为使用reply()方法存储所有消息,因此后面的任何订阅者都不会错过响应消息(1)。

可以执行的路径很少:

    null

2.1.未返回任何消息

  • 解决方案-为获取响应设置了超时(.timeout(responseTimeout))。在此之后,.dofinally(..)清理资源(1)(3)。

2.2.已返回一些响应消息

  • 解决方案-与2.1相同。
    null
 类似资料:
  • 问题内容: 我有servlet- api版本2.5,作为pom.xml中提供的范围。这是我的项目的dependency:tree输出的一部分。“从2.3版本管理;从编译范围管理”是什么意思? 这是否意味着我的类路径上对版本2.3有一些传递依赖?我的WAR文件根本没有servlet- api jar,但是我使用的是Spring 2.5.4的旧版本。我怀疑Spring框架取决于servlet-api

  • 我有一个现有的接口链,我想作为一个反应器运行,而不是管理我自己的线程和队列 我如何链接该结果,使其调用与批量的?

  • 我只想访问reactor netty项目中的Http内容。但结果为空。 代码如下。 我无法在控制台中获得结果。 我可以像在代码中一样访问请求吗?有人能帮忙吗?谢谢。

  • 我使用publishOn和subscribeOn的流量相同,如下所示: 虽然,当我使用两者时,日志中不会打印任何内容。但是当我只使用publishOn时,我得到了以下信息日志: publishOn比Subscribeon更受推荐吗?或者它比subscribeon有更多的偏好?两者之间的区别是什么,什么时候使用哪个?

  • mysql会员订阅数据表的设计应该如何设计?产品有订阅商品和非订阅的,每次都只能购买一个。 订阅有1个月 3个月的 每次到期自动扣费。如果在一个月类购买了几个订阅商品 则扣费按照最新的一个 然后延长到期时间。其实是不是每次订阅都不需要生成新订单的 翻阅了其他资料都找不到很好的设计

  • 例: 注意:是非Android 运行环境, 使用的是RxJava2.x