当前位置: 首页 > 知识库问答 >
问题:

通量switchOnNext变体,在下一个发布服务器发出而不是发出时进行切换_

拓拔弘亮
2023-03-14

反应器具有SwitchOnNext运算符,它镜像一系列发布者,每当新的发布者可用时取消先前的订阅:

对于我的用例,我需要一个关于此主题的变体,在订阅下一个发布服务器之前,我不会取消第一个发布服务器,而是继续镜像发布服务器1的序列,直到发布服务器2发出其第一个项目,然后才进行切换,正如这张大理石图(对于后来发现这个问题的人来说,这不是Reactor文件中现有操作员的图,这是我自己刚刚画的):

我理解,在一般情况下,这可能涉及运营商维护无限数量的订阅,等待其中任何一个发出,然后取消其他订阅,但对于我的用例,我知道通量的初始流量是有限的(因此我不一定需要完全通用的发布者解决方案,对于N个发布者的有限列表就足够了)。

有人能看到现有运营商的巧妙组合来实现这种行为吗?或者我需要从第一原理开始写吗?

共有1个答案

尉迟正奇
2023-03-14

有趣的问题!我认为这样做可能会奏效:

@Test
void switchOnNextEmit() {
  Duration gracePeriod = Duration.ofSeconds(2);

  Flux.concat(
          Mono.just(sequence("a", 1)),
          Mono.just(sequence("b", 3)).delaySubscription(Duration.ofSeconds(5)),
          Mono.just(sequence("c", 10)).delaySubscription(Duration.ofSeconds(10)))
      .map(seq -> seq.publish().refCount(1, gracePeriod))
      .scan(
          Tuples.of(Flux.<String>never(), Flux.<String>never()),
          (acc, next) -> Tuples.of(acc.getT2().takeUntilOther(next), next))
      .switchMap(t -> Flux.merge(t.getT1(), t.getT2()))
      .doOnNext(it -> System.out.println("Result: " + it))
      .then()
      .block();
}

private static Flux<String> sequence(String name, int interval) {
  return Flux.interval(Duration.ofSeconds(interval))
      .map(i -> name + i)
      .doOnSubscribe(__ -> System.out.println("Subscribe: " + name))
      .doOnCancel(() -> System.out.println("Cancel: " + name));
}

重要的是,我们将序列转换为热门发布服务器(这意味着重新订阅它们不会导致另一个订阅,而不是共享初始订阅)。然后,我们使用扫描来发出一个包含前一个和下一个的元组,最后我们只需使用常规的开关映射来观察这两个元组(请注意,由于takeUntilOther的原因,第一个元组在第二个元组发出时将如何停止)。

请注意,宽限期很重要,因为switchMap将首先取消,然后订阅下一个,因此如果没有任何宽限期,将导致当前热门发布服务器完全停止并从头开始,这不是我们想要的。

 类似资料:
  • 我已经在系统上安装并配置了ATG。我对发布服务器和生产服务器使用以下配置: 出版 HTTP端口:8180 HTTPS端口:8543 站点HTTP端口:8180 RMI端口:8861 DRP端口:8851 文件部署:8811 生产 HTTP端口:8080 HTTPS端口:8443 站点HTTP端口:8080 RMI端口:8860 DRP端口:8850 文件部署:8810 在运行我的生产服务器的之后,

  • 我正在使用windows,当socket.io发出事件时,命令提示符显示以下内容: 我想不出问题出在哪里,我做错了什么?

  • 你好,我从我的工作站上释放了一个项目,没有任何问题。我最近尝试从1.0.0-Snapshot发布另一个项目。然而,当它实际发布时,它将1.0.1-快照发布到快照存储库,而不是1.1.0发布到快照存储库。我做了dryrun=真的,看起来很好。事实上,它将我默认为以下版本: 我接受所有的默认值。然而,当我执行“MVN Release:Perform”时,它是这样做的: 我使用的是最新的maven,我手

  • 问题内容: 当我在Node服务器上打印请求的内容时,在任何地方都看不到用户数据。 这是我的节点服务器: 这是Angular2代码: 任何人都可以帮我或解释如何处理角度的http请求。 问题答案: 那是你的服务器: 那是您的有角度的客户: 回购https://github.com/kuncevic/angular-httpclient- examples

  • hprose 为发布服务提供了多个方法,这些方法可以随意组合,通过这种组合,你所发布的服务将不会局限于某一个函数,某一个方法,某一个对象,而是可以将不同的函数和方法随意重新组合成一个服务。 AddFunction 方法 AddFunction(name string, function interface{}, option ...Options) Service 该方法的用于发布一个函数(命名函

  • 在 Azure 门户中将包部署到云服务时,我发现了以下问题: 所有实例都在等待角色启动,出现以下异常: 未处理的异常:Microsoft.ApplicationServer.Caching.DataCacheException web角色在Compute Emulator上运行良好。 我最近确实向我的系统添加了共址缓存,但我不知道如何查明问题所在。 当我检查该角色的事件日志时,我发现了以下两个错误