反应器具有SwitchOnNext
运算符,它镜像一系列发布者,每当新的发布者可用时取消先前的订阅:
对于我的用例,我需要一个关于此主题的变体,在订阅下一个发布服务器之前,我不会取消第一个发布服务器,而是继续镜像发布服务器1的序列,直到发布服务器2发出其第一个项目,然后才进行切换,正如这张大理石图(对于后来发现这个问题的人来说,这不是Reactor文件中现有操作员的图,这是我自己刚刚画的):
我理解,在一般情况下,这可能涉及运营商维护无限数量的订阅,等待其中任何一个发出,然后取消其他订阅,但对于我的用例,我知道通量的初始流量是有限的(因此我不一定需要完全通用的发布者解决方案,对于N个发布者的有限列表就足够了)。
有人能看到现有运营商的巧妙组合来实现这种行为吗?或者我需要从第一原理开始写吗?
有趣的问题!我认为这样做可能会奏效:
@Test
void switchOnNextEmit() {
Duration gracePeriod = Duration.ofSeconds(2);
Flux.concat(
Mono.just(sequence("a", 1)),
Mono.just(sequence("b", 3)).delaySubscription(Duration.ofSeconds(5)),
Mono.just(sequence("c", 10)).delaySubscription(Duration.ofSeconds(10)))
.map(seq -> seq.publish().refCount(1, gracePeriod))
.scan(
Tuples.of(Flux.<String>never(), Flux.<String>never()),
(acc, next) -> Tuples.of(acc.getT2().takeUntilOther(next), next))
.switchMap(t -> Flux.merge(t.getT1(), t.getT2()))
.doOnNext(it -> System.out.println("Result: " + it))
.then()
.block();
}
private static Flux<String> sequence(String name, int interval) {
return Flux.interval(Duration.ofSeconds(interval))
.map(i -> name + i)
.doOnSubscribe(__ -> System.out.println("Subscribe: " + name))
.doOnCancel(() -> System.out.println("Cancel: " + name));
}
重要的是,我们将序列转换为热门发布服务器(这意味着重新订阅它们不会导致另一个订阅,而不是共享初始订阅)。然后,我们使用扫描来发出一个包含前一个和下一个的元组,最后我们只需使用常规的开关映射来观察这两个元组(请注意,由于takeUntilOther的原因,第一个元组在第二个元组发出时将如何停止)。
请注意,宽限期很重要,因为switchMap将首先取消,然后订阅下一个,因此如果没有任何宽限期,将导致当前热门发布服务器完全停止并从头开始,这不是我们想要的。
我已经在系统上安装并配置了ATG。我对发布服务器和生产服务器使用以下配置: 出版 HTTP端口:8180 HTTPS端口:8543 站点HTTP端口:8180 RMI端口:8861 DRP端口:8851 文件部署:8811 生产 HTTP端口:8080 HTTPS端口:8443 站点HTTP端口:8080 RMI端口:8860 DRP端口:8850 文件部署:8810 在运行我的生产服务器的之后,
我正在使用windows,当socket.io发出事件时,命令提示符显示以下内容: 我想不出问题出在哪里,我做错了什么?
你好,我从我的工作站上释放了一个项目,没有任何问题。我最近尝试从1.0.0-Snapshot发布另一个项目。然而,当它实际发布时,它将1.0.1-快照发布到快照存储库,而不是1.1.0发布到快照存储库。我做了dryrun=真的,看起来很好。事实上,它将我默认为以下版本: 我接受所有的默认值。然而,当我执行“MVN Release:Perform”时,它是这样做的: 我使用的是最新的maven,我手
问题内容: 当我在Node服务器上打印请求的内容时,在任何地方都看不到用户数据。 这是我的节点服务器: 这是Angular2代码: 任何人都可以帮我或解释如何处理角度的http请求。 问题答案: 那是你的服务器: 那是您的有角度的客户: 回购https://github.com/kuncevic/angular-httpclient- examples
hprose 为发布服务提供了多个方法,这些方法可以随意组合,通过这种组合,你所发布的服务将不会局限于某一个函数,某一个方法,某一个对象,而是可以将不同的函数和方法随意重新组合成一个服务。 AddFunction 方法 AddFunction(name string, function interface{}, option ...Options) Service 该方法的用于发布一个函数(命名函
在 Azure 门户中将包部署到云服务时,我发现了以下问题: 所有实例都在等待角色启动,出现以下异常: 未处理的异常:Microsoft.ApplicationServer.Caching.DataCacheException web角色在Compute Emulator上运行良好。 我最近确实向我的系统添加了共址缓存,但我不知道如何查明问题所在。 当我检查该角色的事件日志时,我发现了以下两个错误