我正在寻找一种使用Flux实现以下目标的方法:
我知道#4的答案是为计划程序使用一个执行器。然而,我不确定如何实现#6。
下面是一个示例场景:
>
发出响应3、4、5,并开始对其进行处理。
那么——我应该如何修改下面的代码来实现#6?
public class Example {
private final Scheduler scheduler = Schedulers.fromExecutor(
Executors.newFixedThreadPool( 10 ) );
public void start() {
Flux<Request> requestFlux = getFluxOfOneMillionRequests(); // Never mind how this is achieved
Flux<Response> responseFlux = flux.flatMap(request -> doInWorkerThread(request));
flux.doOnNext(response -> processResponse(response)).subscribe()
}
private Mono<Response> doInWorkerThread(Request request) {
return Mono.fromCallable(() -> {
// Do something
return new Response( request.getSerial(), someResult );
}).subscribeOn(scheduler);
}
private void processResponse(Response response) {
// Do something
}
}
我想出了一个确定的答案,尽管我求助于使用低级线程同步代码。这个想法是有3个Flux步骤:
对于多线程调度器,我使用一个执行器,以便按照任务插入的顺序执行任务。这将减少缓存机制必须保存的平均响应量。
以下代码片段演示了这一点:
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
public class StreamSort {
private final Scheduler produceScheduler = Schedulers.fromExecutor(Executors.newFixedThreadPool(3, new ThreadFactoryBuilder().setNameFormat("--> Producer-%d").build()));
private final Scheduler consumeScheduler = Schedulers.fromExecutor(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("<-- Consumer-%d").build()));
private final ResponseReadySignal responseReadySignal = new ResponseReadySignal();
private int nextExpected = 1;
private List<Response> arrivals = Lists.newArrayList();
private static class ResponseReadySignal {}
private Random rand = new Random();
public static void main(String[] args) throws InterruptedException {
StreamSort m = new StreamSort();
m.start();
}
private void start() throws InterruptedException {
Flux<Request> requests = generateRequestFlux();
requests
.flatMap(request -> doInWorkerThread(request))
.flatMap(signal -> jumpToCollectingThread(signal))
.flatMap(signal -> departInOrder(signal))
.doOnNext(response -> log("Got " + response ))
.doOnComplete(() -> stopWaiting())
.subscribe();
waitUntilFinished();
log("Disposing schedulers");
produceScheduler.dispose();
consumeScheduler.dispose();
log("FINISH");
}
private synchronized void waitUntilFinished() throws InterruptedException {
this.wait();
}
private synchronized void stopWaiting() {
this.notifyAll();
}
private Flux<Request> generateRequestFlux() {
List<Integer> serials = Lists.newArrayList(1,2,3,4,5,6,7,8,9,10);
return Flux.fromIterable(serials).map(serial -> new Request(serial));
}
private Mono<ResponseReadySignal> doInWorkerThread(Request request) {
return Mono.fromCallable(() -> {
Response response = handleRequest(request);
synchronized (this) {
arrivals.add(response);
}
return responseReadySignal;
}).subscribeOn(produceScheduler);
}
private Response handleRequest(Request request) throws InterruptedException {
int milli = rand.nextInt(1000);
log( request + " start, sleeping " + milli );
Thread.currentThread().sleep( milli ); // Simulate some task that takes time
log( request + " Finished" );
return new Response(request.getSerial()); // Simulate response creation.
}
private Mono<ResponseReadySignal> jumpToCollectingThread(ResponseReadySignal signal) {
log( "Delivering signal to collecting thread" );
return Mono.fromCallable(() -> signal).subscribeOn(consumeScheduler);
}
private Flux<Response> departInOrder(ResponseReadySignal signal) {
List<Response> readyToDepart = Lists.newLinkedList();
synchronized (this) {
Collections.sort(arrivals);
while (arrivals.size() > 0 && arrivals.get(0).getSerial() == nextExpected) {
readyToDepart.add(arrivals.remove(0));
++nextExpected;
}
}
log("Departing " + readyToDepart.size() + " items: " + readyToDepart);
return Flux.fromIterable(readyToDepart);
}
private static void log( String message ) {
Thread t = Thread.currentThread();
System.out.println( t.getName() + ": " + message );
}
private static class WithSerial implements Comparable<WithSerial> {
private final int serial;
public WithSerial(int serial) {
this.serial = serial;
}
public int getSerial() {
return serial;
}
@Override
public int compareTo(WithSerial o) {
return this.serial - o.serial;
}
@Override
public String toString() {
return "" + getSerial();
}
}
private static class Request extends WithSerial {
public Request(int serial) {
super(serial);
}
}
private static class Response extends WithSerial {
public Response(int serial) {
super(serial);
}
}
}
问题内容: 我有一个Android应用程序,当前正在使用库来发出网络请求并使用来显示下载的图像。 我想测试的功能,由于我需要运行大量请求(数千个),因此我有点担心并行执行。处理的并行请求,将并行运行的请求限制为四个,而其他请求排队等待执行。在文档中,我找不到任何方法来处理并发请求的数量,而且我怀疑此类详细信息留给了此库中的开发人员。 这样对吗?如果是这样,是否有任何面向Android的实现/库可用
问题内容: 我正在使用Node.js运行服务器,并且需要从正在运行的另一台服务器()请求数据。我需要向数据服务器发出许多请求(〜200)并收集数据(响应大小从〜20Kb到〜20Mb不等)。每个请求都是独立的,我想将响应保存为以下形式的一个巨大数组: 请注意,项目的顺序并不重要,理想情况下,它们应该以数据可用的顺序填充数组。 现在,当运行该程序时,它将显示: 现在,由于文件的大小如此可变,我期望它可
我试图避免使用阻塞线程模型,而使用反应式模型来实现高吞吐量。我的用例是这样的:有大量的传入消息。对于每条消息,我需要在不阻塞该线程的情况下执行一些I/O操作。在这里,我在一个单独的线程中处理每条消息。如果应用程序被终止,我需要完成正在进行的任务并优雅地关闭。我用的是线。在下面睡眠以模拟密集的I/O操作。代码示例如下: 当我运行这个时,通量似乎忽略了执行者。shutdown()和错误以及中断的异常。
问题内容: 问题 我有两个Apis。Api 1为我提供了一个项目列表,Api 2为我提供了我从Api 1获得的每个项目的更详细信息。到目前为止,我解决它的方式导致性能下降。 问题 借助Retrofit和RxJava,可以快速有效地解决此问题。 我的方法 当下,我的解决方案如下所示: 步骤1:从Api 1 执行改造。 第2步:我遍历此项目,并向Api 2请求每个项目。 步骤3:对每个项目依次执行改造
问题内容: 我正在创建具有flux体系结构的react.js应用程序,并且试图弄清楚应该何时何地从服务器请求数据。有这个例子吗?(不是TODO应用!) 问题答案: 我强烈支持将异步写入操作放在动作创建者中,而将异步读取操作放在商店中。目标是将商店状态修改代码保留在完全同步的动作处理程序中;这使他们易于推理,并且易于进行单元测试。为了防止对同一端点的多个同时请求(例如,重复读取),我将把实际的请求处
我已经参考了这些链接,但仍然怀疑对singleton bean的并发请求是另一个不太相关的链接 我的问题/疑问:并行请求是否会由一个Spring单例bean[因为这只是一个对象/实例]并行/顺序处理?@Controller,@服务甚至在多核处理器上(并行线程执行能力) 希望不是,但它是如何工作的。 从第一个链接开始,我了解了一些东西——一个singleton bean对象,这个singleton