你好,我有一个简单的问题,假设我有一个类如下:
import lombok.Value;
import java.nio.file.Path;
@Value
class ImageResizeRequest {
private DownloadedImage downloadedImage;
private ImageSize imageSize;
private Path destinationLocation;
}
上面的类表示负责将图像大小调整为给定大小的单个任务。我有很多要求将此图像的大小调整为许多不同的大小。
@RequiredArgsConstructor
class ImageResizeService {
private final Executor executor;
Mono<List<ImageResizeResult>> resize(List<ImageResizeRequest> requests) {
return Flux.fromIterable(requests)
.flatMap(this::resize)
.collectList()
.subscribeOn(Schedulers.fromExecutor(executor));
}
private Mono<ImageResizeResult> resize(ImageResizeRequest request) {
return Mono.fromFuture(CompletableFuture.supplyAsync(resizeTask(request), executor));
}
private Supplier<ImageResizeResult> resizeTask(ImageResizeRequest request) {
return () -> {
//TODO add image resize logic for example ImageMagick by Im4Java...
/** code below call ImageMagick library
ConvertCmd cmd = new ConvertCmd();
IMOperation op = new IMOperation();
op.quality(100d);
op.addImage(request.getDestinationLocation().toString());
cmd.run(op);
*/
//TODO add logic!!!
return new ImageResizeResult(null, null, null, null);
};
}
}
我的问题是:如何在项目反应器中实现并行独立任务负责调整图像大小?如果没有项目Reactor,我将使用CompletableFuture列表:
private static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {
CompletableFuture<Void> allDoneFuture =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
return allDoneFuture.thenApply(v ->
futures.stream().
map(future -> future.join()).
collect(Collectors.<T>toList())
);
}
具有指定的执行者服务。此外,在我的例子中,我在subscribeOn方法和supplyAsync中使用了同一个执行器——这是个好主意吗?
因此,我的all过程如下所示:
@RequiredArgsConstructor
class ImageCommandProcessingService {
private final DownloadRequestFactory downloadRequestFactory;
private final ImageClientDownloader imageClientDownloader;
private final ImageResizeRequestFactory imageResizeRequestFactory;
private final ImageResizeService imageResizeService;
Mono<List<ImageResizeResult>> process(ResizeImageCommand resizeImageCommand) {
return Mono.just(resizeImageCommand)
.map(command -> downloadRequestFactory.create(command.getImageUrl().getUrl()))
.flatMap(imageClientDownloader::downloadImage)
.map(downloadedImage -> imageResizeRequestFactory.createRequests(downloadedImage, resizeImageCommand.getSizes().toJavaList()))
.flatMap(imageResizeService::resize);
}
}
我有一个带有图像URL和大小集的命令:
@Value
class ResizeImageCommand {
private ImageUrl imageUrl;
private Set<ImageSize> sizes;
}
首先,我需要将映像下载到磁盘上,因此我按工厂创建了一个下载请求:
@RequiredArgsConstructor
class DownloadRequestFactory {
private final ImageLocationPathResolver resolver;
DownloadRequest create(String url) {
return new DownloadRequest(url, resolver.resolveDownloadedLocation(url));
}
}
Resolver是一个类,负责创建临时文件的路径,并为调整大小的图像创建路径:
class ImageLocationPathResolver {
private String temporaryImagesFolder;
private String destinationImagesFolder;
Path resolveDownloadedLocation(String imageUrl) {
LocalDateTime now = LocalDateTime.now();
String fileName = now.toString() + "_" + getFileNameExtensionFromUrl(imageUrl);
return Paths.get(temporaryImagesFolder,getDatePaths(now.toLocalDate()), fileName);
}
Path resolveDestinationLocation(ImageSize imageSize, String url) {
String fileName = getFileNameExtensionFromUrl(url);
return Paths.get(destinationImagesFolder, imageSize.getName(), getDatePaths(LocalDate.now()), fileName);
}
private String getFileNameExtensionFromUrl(String url) {
return StringUtils.getFilenameExtension(url);
}
private String getDatePaths(LocalDate now) {
return now.getYear() + File.pathSeparator + now.getMonth() + File.pathSeparator + now.getDayOfMonth();
}
}
更远我有一个客户端负责下载操作:
public interface ImageClientDownloader {
Mono<DownloadedImage> downloadImage(DownloadRequest downloadRequest);
}
和实施:
@Slf4j
class HttpImageClientDownloader implements ImageClientDownloader {
private final WebClient webClient;
HttpImageClientDownloader() {
this.webClient = WebClient.create();
}
@Override
public Mono<DownloadedImage> downloadImage(DownloadRequest downloadRequest) {
try {
Flux<DataBuffer> dataBuffer = webClient.get()
.uri(downloadRequest.getUrl())
.retrieve()
.bodyToFlux(DataBuffer.class);
Path resultFilePath = Files.createFile(downloadRequest.getLocation());
WritableByteChannel channel = Files.newByteChannel(resultFilePath, StandardOpenOption.WRITE);
return DataBufferUtils.write(dataBuffer, channel)
.map(DataBufferUtils::release)
.then(Mono.just(new DownloadedImage(downloadRequest.getUrl(), resultFilePath, LocalDateTime.now())));
} catch (Exception e) {
log.error(e.getMessage(), e);
return Mono.error(e);
}
}
}
这是IO操作。我应该使用专用的调度程序吗?最后我有调整大小的操作,请求是在map操作-ImageResizeRequest estFactory中创建的。
不要不断地从< code>ExecutorService中重新创建< code>Scheduler,而是努力将它直接包装在构造函数中。
您根本不需要CompletableFuture
,而subscribeOn
应应用于平面图的内部,以在每个调整大小任务中潜在地选择单独的线程(它应用于每个通量时从池中选择一个线程):
class ImageResizeService {
private final Executor executor; //TODO prefer an ExecutorService if possible
private final Scheduler scheduler; //FIXME Schedulers.fromExecutor(executor)
Mono<List<ImageResizeResult>> resize(List<ImageResizeRequest> requests) {
//we get the requests on IO thread
return Flux.fromIterable(requests)
//for each request, perform asynchronous resize...
.flatMap(r -> Mono
//... by converting the resizeTask Callable to a Mono
.fromCallable(r -> resizeTask(r).get())
//... and making sure it executes on the executor
.subscribeOn(scheduler)
)
.collectList();
}
}
为了实现真正的并行化,您还有另一个选择:
并行(). runOn()
:
Mono<List<ImageResizeResult>> resize(List<ImageResizeRequest> requests) {
//we get the requests on IO thread
return Flux.fromIterable(requests)
//divide into N workloads
//the executor _should_ be capable of this degree of parallelisation:
.parallel(NUMBER_OF_DESIRED_THREADS)
//actually tell to run each workload on a thread picked from executor
.runOn(scheduler)
//here the workload are already running on their dedicated thread,
//we can afford to block it and thus apply resize in a simpler `map`
.map(r -> resizeTask(r).get()) //NB: the Supplier aspect can probably be removed
//go back to a `Flux` sequence for collection into list
.sequential()
.collectList();
}
本文向大家介绍C#异步执行任务的方法,包括了C#异步执行任务的方法的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了C#异步执行任务的方法。分享给大家供大家参考。具体如下: 希望本文所述对大家的C#程序设计有所帮助。
问题内容: 在过去的几个小时中,我一直在努力解决这个问题,但无法解决。我想我仍然必须习惯于函数式编程风格;) 我写了一个递归函数,它遍历目录结构并对某些文件进行处理。此功能使用异步IO方法。现在,我要在完成整个遍历后执行一些操作。 如何确保在执行完所有调用但仍使用异步IO功能后执行此操作? 问题答案: 查找“ 步骤”模块。它可以链接异步函数调用,并将结果从一个传递到另一个。
是否可以调用一个异步方法,以便它从一个同步的方法异步运行?我不关心它挂起同步调用程序直到它返回,而是希望该方法被异步调用。
这是在一次Android采访中被问到的。有人问我是否可以从异步任务 1 的 doInBackground() 方法(让它成为 Task1)启动另一个异步任务(让它成为 Task2)。我浏览了文档,其中说了以下内容: 必须在UI线程上创建任务实例。 必须在 UI 线程上调用 execute(Params...)。 根据这些陈述,我认为从另一个任务的后台方法启动一个任务是不可能的。此外,async任务
在Server程序中如果需要执行很耗时的操作,比如一个聊天服务器发送广播,Web服务器中发送邮件。如果直接去执行这些函数就会阻塞当前进程,导致服务器响应变慢。 Swoole提供了异步任务处理的功能,可以投递一个异步任务到TaskWorker进程池中执行,不影响当前请求的处理速度。 程序代码 基于第一个TCP服务器,只需要增加onTask和onFinish 2个事件回调函数即可。另外需要设置task
本文向大家介绍SpringBoot异步任务使用方法详解,包括了SpringBoot异步任务使用方法详解的使用技巧和注意事项,需要的朋友参考一下 步骤,如图所示: 1.添加异步任务业务类 2.添加测试控制器 3.添加启动类 4.右键项目Run As启动,访问url 结果: 以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持呐喊教程。