我已经阅读了整个反应堆的文件,但我无法找到正确的模式,以解决以下问题。我有一个方法可以异步地执行某些操作。我以Flux的形式返回结果响应,消费者可以订阅它。
该方法有以下定义:
Flux<ResultMessage> sendRequest(RequestMessage message);
返回的通量是一个热通量,结果可以在任何给定的时间异步来。
sendRequest(message).subscribe(response->doSomethinWithResponse(response);
Flux<ResultMessage> sendRequest(RequestMessage message) {
Flux<ResultMessage> result = incomingMessageStream
.filter( resultMessage -> Objects.equals( resultMessage.getId(), message.getId() ) )
.take( 2 );
// The message sending is done here...
return result;
}
其中,incomingMessageStream
是通过此通道的所有消息的flux
。这个实现的一个问题是,消费者是在结果消息到来之后订阅的,它可能会错过其中的一些消息。
所以,我正在寻找的是一个解决方案,将允许消费者不依赖于订阅的时间。潜在的消费者可能根本不需要订阅产生的flux
。我正在寻找一个通用的解决方案,但如果这是不可能的,您可以假设结果消息的数量不大于2。
过了一段时间,我创建了一个似乎有效的解决方案:
Flux<ResultMessage> sendRequest(RequestMessage message) {
final int maxResponsesCount = 2;
final Duration responseTimeout = Duration.ofSeconds( 10 );
final Duration subscriptionTimeout = Duration.ofSeconds( 5 );
// (1)
ConnectableFlux<ResultMessage> result = incomingMessageStream
.ofType( ResultMessage.class )
.filter( resultMessage ->Objects.equals(resultMessage.getId(), message.getId() ) )
.take( maxResponsesCount )
.timeout( responseTimeout )
.replay( maxResponsesCount );
Disposable connectionDisposable = result.connect();
// (2)
AtomicReference<Subscription> subscriptionForCancelSubscription = new AtomicReference<>();
Mono.delay( subscriptionTimeout )
.doOnSubscribe( subscriptionForCancelSubscription::set )
.subscribe( x -> connectionDisposable.dispose() );
// The message sending is done here...
// (3)
return result
.doOnSubscribe(s ->subscriptionForCancelSubscription.get().cancel())
.doFinally( signalType -> connectionDisposable.dispose() );
}
我正在使用一个ConnectableFlux立即连接到流,而不需要订阅,它被设置为使用reply()方法存储所有消息,因此后面的任何订阅者都不会错过响应消息(1)。
可以执行的路径很少:
2.1.未返回任何消息
.timeout(responseTimeout)
)。在此之后,.dofinally(..)
清理资源(1)(3)。2.2.已返回一些响应消息
问题内容: 我有servlet- api版本2.5,作为pom.xml中提供的范围。这是我的项目的dependency:tree输出的一部分。“从2.3版本管理;从编译范围管理”是什么意思? 这是否意味着我的类路径上对版本2.3有一些传递依赖?我的WAR文件根本没有servlet- api jar,但是我使用的是Spring 2.5.4的旧版本。我怀疑Spring框架取决于servlet-api
我有一个现有的接口链,我想作为一个反应器运行,而不是管理我自己的线程和队列 我如何链接该结果,使其调用与批量的?
我只想访问reactor netty项目中的Http内容。但结果为空。 代码如下。 我无法在控制台中获得结果。 我可以像在代码中一样访问请求吗?有人能帮忙吗?谢谢。
我使用publishOn和subscribeOn的流量相同,如下所示: 虽然,当我使用两者时,日志中不会打印任何内容。但是当我只使用publishOn时,我得到了以下信息日志: publishOn比Subscribeon更受推荐吗?或者它比subscribeon有更多的偏好?两者之间的区别是什么,什么时候使用哪个?
mysql会员订阅数据表的设计应该如何设计?产品有订阅商品和非订阅的,每次都只能购买一个。 订阅有1个月 3个月的 每次到期自动扣费。如果在一个月类购买了几个订阅商品 则扣费按照最新的一个 然后延长到期时间。其实是不是每次订阅都不需要生成新订单的 翻阅了其他资料都找不到很好的设计
例: 注意:是非Android 运行环境, 使用的是RxJava2.x