当前位置: 首页 > 知识库问答 >
问题:

Flux中并行请求的有序流

舒永嘉
2023-03-14

我正在寻找一种使用Flux实现以下目标的方法:

  1. 处理1,000,000(或更多)请求流。
  2. 请求被编号(从1到1,000,000)。
  3. 请求应该使用10个线程并行启动。
  4. 请求的启动顺序由其序列号决定。
  5. 结果通量应该以与请求相同的顺序返回响应。
  6. 结果通量应该尽快发出每个响应,因为它的所有前身都是可用的。

我知道#4的答案是为计划程序使用一个执行器。然而,我不确定如何实现#6。

下面是一个示例场景:

>

发出响应3、4、5,并开始对其进行处理。

那么——我应该如何修改下面的代码来实现#6?

public class Example {

private final Scheduler scheduler = Schedulers.fromExecutor(
	Executors.newFixedThreadPool( 10 ) );

public void start() {
	Flux<Request> requestFlux = getFluxOfOneMillionRequests(); // Never mind how this is achieved
	
	Flux<Response> responseFlux = flux.flatMap(request -> doInWorkerThread(request));
	
	flux.doOnNext(response -> processResponse(response)).subscribe()
}

private Mono<Response> doInWorkerThread(Request request) {

	return Mono.fromCallable(() -> {

		// Do something
		return new Response( request.getSerial(), someResult );
	}).subscribeOn(scheduler);
}

private void processResponse(Response response) {
	// Do something
}
}

共有1个答案

梁才
2023-03-14

我想出了一个确定的答案,尽管我求助于使用低级线程同步代码。这个想法是有3个Flux步骤:

  • 第一步是在多个线程中处理请求,方法是在调度器上订阅Mono,该调度器使用一个执行器和多个工作线程。当工作线程完成时,它将其响应放在一个公共列表中,并返回一些任意对象,以便激活第二步

对于多线程调度器,我使用一个执行器,以便按照任务插入的顺序执行任务。这将减少缓存机制必须保存的平均响应量。

以下代码片段演示了这一点:

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;

public class StreamSort {

    private final Scheduler produceScheduler = Schedulers.fromExecutor(Executors.newFixedThreadPool(3, new ThreadFactoryBuilder().setNameFormat("--> Producer-%d").build()));
    private final Scheduler consumeScheduler = Schedulers.fromExecutor(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("<-- Consumer-%d").build()));
    private final ResponseReadySignal responseReadySignal = new ResponseReadySignal();
    private int nextExpected = 1;
    private List<Response> arrivals = Lists.newArrayList();

    private static class ResponseReadySignal {}
    private Random rand = new Random();

    public static void main(String[] args) throws InterruptedException {

        StreamSort m = new StreamSort();

        m.start();
    }

    private void start() throws InterruptedException {

        Flux<Request> requests = generateRequestFlux();

        requests
                .flatMap(request -> doInWorkerThread(request))
                .flatMap(signal -> jumpToCollectingThread(signal))
                .flatMap(signal -> departInOrder(signal))
                .doOnNext(response -> log("Got " + response ))
                .doOnComplete(() -> stopWaiting())
                .subscribe();

        waitUntilFinished();

        log("Disposing schedulers");
        produceScheduler.dispose();
        consumeScheduler.dispose();
        log("FINISH");
    }

    private synchronized void waitUntilFinished() throws InterruptedException {
        this.wait();
    }

    private synchronized void stopWaiting() {
        this.notifyAll();
    }

    private Flux<Request> generateRequestFlux() {
        List<Integer> serials = Lists.newArrayList(1,2,3,4,5,6,7,8,9,10);

        return Flux.fromIterable(serials).map(serial -> new Request(serial));
    }

    private Mono<ResponseReadySignal> doInWorkerThread(Request request) {

        return Mono.fromCallable(() -> {

            Response response = handleRequest(request);

            synchronized (this) {
                arrivals.add(response);
            }

            return responseReadySignal;
        }).subscribeOn(produceScheduler);
    }

    private Response handleRequest(Request request) throws InterruptedException {
        int milli = rand.nextInt(1000);

        log( request + " start, sleeping " + milli );

        Thread.currentThread().sleep( milli ); // Simulate some task that takes time

        log( request + " Finished" );

        return new Response(request.getSerial()); // Simulate response creation.

    }

    private Mono<ResponseReadySignal> jumpToCollectingThread(ResponseReadySignal signal) {

        log( "Delivering signal to collecting thread" );

        return Mono.fromCallable(() -> signal).subscribeOn(consumeScheduler);
    }

    private Flux<Response> departInOrder(ResponseReadySignal signal) {

        List<Response> readyToDepart = Lists.newLinkedList();

        synchronized (this) {
            Collections.sort(arrivals);
            while (arrivals.size() > 0 && arrivals.get(0).getSerial() == nextExpected) {
                readyToDepart.add(arrivals.remove(0));
                ++nextExpected;
            }
        }

        log("Departing " + readyToDepart.size() + " items: " + readyToDepart);
        return Flux.fromIterable(readyToDepart);
    }

    private static void log( String message ) {
        Thread t = Thread.currentThread();
        System.out.println( t.getName() + ": " + message );

    }

    private static class WithSerial implements Comparable<WithSerial> {
        private final int serial;

        public WithSerial(int serial) {
            this.serial = serial;
        }

        public int getSerial() {
            return serial;
        }

        @Override
        public int compareTo(WithSerial o) {
            return this.serial - o.serial;
        }

        @Override
        public String toString() {
            return "" + getSerial();
        }
    }

    private static class Request extends WithSerial {

        public Request(int serial) {
            super(serial);
        }
    }

    private static class Response extends WithSerial {
        public Response(int serial) {
            super(serial);
        }
    }
}
 类似资料:
  • 问题内容: 我有一个Android应用程序,当前正在使用库来发出网络请求并使用来显示下载的图像。 我想测试的功能,由于我需要运行大量请求(数千个),因此我有点担心并行执行。处理的并行请求,将并行运行的请求限制为四个,而其他请求排队等待执行。在文档中,我找不到任何方法来处理并发请求的数量,而且我怀疑此类详细信息留给了此库中的开发人员。 这样对吗?如果是这样,是否有任何面向Android的实现/库可用

  • 问题内容: 我正在使用Node.js运行服务器,并且需要从正在运行的另一台服务器()请求数据。我需要向数据服务器发出许多请求(〜200)并收集数据(响应大小从〜20Kb到〜20Mb不等)。每个请求都是独立的,我想将响应保存为以下形式的一个巨大数组: 请注意,项目的顺序并不重要,理想情况下,它们应该以数据可用的顺序填充数组。 现在,当运行该程序时,它将显示: 现在,由于文件的大小如此可变,我期望它可

  • 我试图避免使用阻塞线程模型,而使用反应式模型来实现高吞吐量。我的用例是这样的:有大量的传入消息。对于每条消息,我需要在不阻塞该线程的情况下执行一些I/O操作。在这里,我在一个单独的线程中处理每条消息。如果应用程序被终止,我需要完成正在进行的任务并优雅地关闭。我用的是线。在下面睡眠以模拟密集的I/O操作。代码示例如下: 当我运行这个时,通量似乎忽略了执行者。shutdown()和错误以及中断的异常。

  • 问题内容: 问题 我有两个Apis。Api 1为我提供了一个项目列表,Api 2为我提供了我从Api 1获得的每个项目的更详细信息。到目前为止,我解决它的方式导致性能下降。 问题 借助Retrofit和RxJava,可以快速有效地解决此问题。 我的方法 当下,我的解决方案如下所示: 步骤1:从Api 1 执行改造。 第2步:我遍历此项目,并向Api 2请求每个项目。 步骤3:对每个项目依次执行改造

  • 问题内容: 我正在创建具有flux体系结构的react.js应用程序,并且试图弄清楚应该何时何地从服务器请求数据。有这个例子吗?(不是TODO应用!) 问题答案: 我强烈支持将异步写入操作放在动作创建者中,而将异步读取操作放在商店中。目标是将商店状态修改代码保留在完全同步的动作处理程序中;这使他们易于推理,并且易于进行单元测试。为了防止对同一端点的多个同时请求(例如,重复读取),我将把实际的请求处

  • 我已经参考了这些链接,但仍然怀疑对singleton bean的并发请求是另一个不太相关的链接 我的问题/疑问:并行请求是否会由一个Spring单例bean[因为这只是一个对象/实例]并行/顺序处理?@Controller,@服务甚至在多核处理器上(并行线程执行能力) 希望不是,但它是如何工作的。 从第一个链接开始,我了解了一些东西——一个singleton bean对象,这个singleton