当前位置: 首页 > 知识库问答 >
问题:

如何在Reactor 3中跨多个发布服务器对工作进行排队?

颛孙庆
2023-03-14

我正在创建一个库,用于使用Reactor3创建数据处理工作流。每个任务都有一个输入流量和一个输出流量。输入流量由用户提供。输出流量由库创建。任务可以链接以形成DAG。类似于这样:(它是静态编程语言)

val base64 = task<String, String>("base64") {
    input { Flux.just("a", "b", "c", "d", "e") }
    outputFn { ... get the output values ... }
    scriptFn { ... do some stuff ... }
}

val step2 = task<List<String>, String>("step2") {
    input { base64.output.buffer(3) }
    outputFn { ... }
    scriptFn { ... }
}

我需要限制整个工作流的并发性。一次只能处理配置数量的输入。在上面的示例中,对于3的限制,这意味着task base64将首先使用输入“a”、“b”和“c”运行,然后等待每个输入完成,然后再处理“d”、“e”和“step2”任务。

在从输入通量创建输出通量时,如何应用这些限制?TopicProcessor能否以某种方式应用?也许是某种定制的调度器或处理器?背压是如何工作的?我需要担心创建缓冲区吗?

共有1个答案

严修诚
2023-03-14

背压从最后一个Susbuiber向上传播,穿过整个链条。但是链中的操作员可以提前请求数据(预取),甚至“重写”请求。例如,在缓冲区(3)的情况下,如果该操作员收到请求(1),它将执行上游请求(3)(“1缓冲区==最多3个元素,以便我可以请求足够的源来填充请求的1缓冲区”)。

如果输入总是由用户提供,这将很难抽象出来。。。

没有简单的方法可以对跨多个管道的源进行评级,甚至对给定管道的多个订阅进行评级(流量)。

在多个发布中使用共享的调度程序将不起作用,因为发布会选择一个工作线程并将其粘住。

但是,如果您的问题更具体地说是关于受限制的base64任务,那么可能可以从flatMap的并发参数中获得效果?

input.flatMap(someString -> asyncProcess(someString), 3, 1);

这将允许最多3次出现的asyncProcess运行,并且每次终止时,它都会从输入的下一个值开始一个新值。

 类似资料:
  • 我使用JavaFX制作了一个GUI,有三个单选按钮,一旦用户单击提交并创建了另一个线程,并且根据检查了什么单选按钮,线程运行所需的输出并将结果输出到控制台。 但是当线程运行时(一个进程需要大约30秒才能完成),我可以检查任何放射性按钮。它创建另一个线程并与另一个正在进行的线程一起输出长时间。所以我的输出框只是一个乱七八糟的东西!我在看异步任务,但我不确定这是否与它有关。 以下是我需要的:如果一个任

  • 如果我们使用spring boot构建所有微服务,那么可以使用Eureka服务器(@EnableEurekaServer)发现所有微服务(@EnableEurekaClient)。如果某些微服务是使用其他技术构建的,那么在云(PCF、AWS等)中如何实现发现、负载平衡、反向代理(网关)? 我在网上读了很多关于微服务的博客,我没有得到适当的信息。

  • 我们有现有的Spring批处理应用程序,我们希望使其可扩展以在多个节点上运行。 例如,我希望在不同的工作服务器上运行作业。在这种情况下,我有一个作业a和一个作业B,在worker server的一个实例上运行作业a,在worker server的另一个实例中运行作业B。 在spring batch integration文档中,我找到了一个关于SETP远程分区的解释。然而,我不知道如何从主服务器启

  • 我对IntentService的使用有点困惑。 文档中说,IntentService将发送给它的所有意图排队,并一次处理一个意图 我很确定我在文档中读到过这样的情况:系统只调用onStartCommand()一次,如果您发出两次startService(),第二次调用不会导致调用onStartCommand()<我可能错了,因为我一直在寻找这篇文档,但似乎找不到 这与之前的概念相矛盾,即可以通过o

  • 问题内容: 我知道node.js是单线程,异步,无阻塞的I / O。我已经读了很多。例如,PHP每个请求使用一个线程,但是节点仅对所有线程使用一个线程。 假设有三个请求a,b,c同时到达node.js服务器。这些请求中的三个需要大型阻止操作,例如,它们都希望读取相同的大文件。 然后,如何将请求排队,将按什么顺序执行阻塞操作,以及按什么顺序分派响应?当然使用多少个线程? 请告诉我三个请求从请求到响应

  • 假设有两个微服务:订单和库存。order service中有一个API,它接受< code>ProductId 、< code>Qty等并下订单。 理想情况下,只有在库存服务中存在库存时才允许下订单。人们建议使用Saga模式或任何其他分布式事务。这很好,最终将利用一致性。 但是如果有人想滥用这个系统。他可以使用无效或缺货的产品(< code>ProductId)推送订单。系统将接受所有这些订单,并