当前位置: 首页 > 知识库问答 >
问题:

我可以对 subscribeOn 方法和异步任务使用相同的执行器吗?

吴西岭
2023-03-14

你好,我有一个简单的问题,假设我有一个类如下:

import lombok.Value;

import java.nio.file.Path;

@Value
class ImageResizeRequest {

    private DownloadedImage downloadedImage;

    private ImageSize imageSize;

    private Path destinationLocation;
}

上面的类表示负责将图像大小调整为给定大小的单个任务。我有很多要求将此图像的大小调整为许多不同的大小。

@RequiredArgsConstructor
class ImageResizeService {

    private final Executor executor;

    Mono<List<ImageResizeResult>> resize(List<ImageResizeRequest> requests) {

        return Flux.fromIterable(requests)
                .flatMap(this::resize)
                .collectList()
                .subscribeOn(Schedulers.fromExecutor(executor));
    }

    private Mono<ImageResizeResult> resize(ImageResizeRequest request) {

        return Mono.fromFuture(CompletableFuture.supplyAsync(resizeTask(request), executor));

    }

    private Supplier<ImageResizeResult> resizeTask(ImageResizeRequest request) {
        return () -> {
            //TODO add image resize logic for example ImageMagick by Im4Java...
            /** code below call ImageMagick library
             ConvertCmd cmd = new ConvertCmd();
             IMOperation op = new IMOperation();
             op.quality(100d);
             op.addImage(request.getDestinationLocation().toString());
             cmd.run(op);

             */
            //TODO add logic!!!
            return new ImageResizeResult(null, null, null, null);
        };
    }
}

我的问题是:如何在项目反应器中实现并行独立任务负责调整图像大小?如果没有项目Reactor,我将使用CompletableFuture列表:

private static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {
    CompletableFuture<Void> allDoneFuture =
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
    return allDoneFuture.thenApply(v ->
            futures.stream().
                    map(future -> future.join()).
                    collect(Collectors.<T>toList())
    );
}

具有指定的执行者服务。此外,在我的例子中,我在subscribeOn方法和supplyAsync中使用了同一个执行器——这是个好主意吗?

共有2个答案

贺佑运
2023-03-14

因此,我的all过程如下所示:

@RequiredArgsConstructor
class ImageCommandProcessingService {

    private final DownloadRequestFactory downloadRequestFactory;
    private final ImageClientDownloader imageClientDownloader;
    private final ImageResizeRequestFactory imageResizeRequestFactory;
    private final ImageResizeService imageResizeService;

    Mono<List<ImageResizeResult>> process(ResizeImageCommand resizeImageCommand) {
        return Mono.just(resizeImageCommand)
                .map(command -> downloadRequestFactory.create(command.getImageUrl().getUrl()))
                .flatMap(imageClientDownloader::downloadImage)
                .map(downloadedImage -> imageResizeRequestFactory.createRequests(downloadedImage, resizeImageCommand.getSizes().toJavaList()))
                .flatMap(imageResizeService::resize);

    }

}

我有一个带有图像URL和大小集的命令:

@Value
class ResizeImageCommand {

    private ImageUrl imageUrl;

    private Set<ImageSize> sizes;
}

首先,我需要将映像下载到磁盘上,因此我按工厂创建了一个下载请求:

@RequiredArgsConstructor
class DownloadRequestFactory {

    private final ImageLocationPathResolver resolver;

    DownloadRequest create(String url) {
        return new DownloadRequest(url, resolver.resolveDownloadedLocation(url));
    }
}

Resolver是一个类,负责创建临时文件的路径,并为调整大小的图像创建路径:

class ImageLocationPathResolver {

    private String temporaryImagesFolder;
    private String destinationImagesFolder;

    Path resolveDownloadedLocation(String imageUrl) {
        LocalDateTime now = LocalDateTime.now();
        String fileName = now.toString() + "_" + getFileNameExtensionFromUrl(imageUrl);
        return Paths.get(temporaryImagesFolder,getDatePaths(now.toLocalDate()), fileName);
    }

    Path resolveDestinationLocation(ImageSize imageSize, String url) {
        String fileName = getFileNameExtensionFromUrl(url);
        return Paths.get(destinationImagesFolder, imageSize.getName(), getDatePaths(LocalDate.now()), fileName);
    }

    private String getFileNameExtensionFromUrl(String url) {
        return StringUtils.getFilenameExtension(url);
    }

    private String getDatePaths(LocalDate now) {
        return now.getYear() + File.pathSeparator + now.getMonth() + File.pathSeparator + now.getDayOfMonth();
    }
}

更远我有一个客户端负责下载操作:

public interface ImageClientDownloader {

    Mono<DownloadedImage> downloadImage(DownloadRequest downloadRequest);
}

和实施:

@Slf4j
class HttpImageClientDownloader implements ImageClientDownloader {

    private final WebClient webClient;

    HttpImageClientDownloader() {
        this.webClient = WebClient.create();
    }

    @Override
    public Mono<DownloadedImage> downloadImage(DownloadRequest downloadRequest) {
        try {
            Flux<DataBuffer> dataBuffer = webClient.get()
                    .uri(downloadRequest.getUrl())
                    .retrieve()
                    .bodyToFlux(DataBuffer.class);


            Path resultFilePath = Files.createFile(downloadRequest.getLocation());
            WritableByteChannel channel = Files.newByteChannel(resultFilePath, StandardOpenOption.WRITE);
            return DataBufferUtils.write(dataBuffer, channel)
                    .map(DataBufferUtils::release)
                    .then(Mono.just(new DownloadedImage(downloadRequest.getUrl(), resultFilePath, LocalDateTime.now())));

        } catch (Exception e) {
            log.error(e.getMessage(), e);
            return Mono.error(e);
        }
    }
}

这是IO操作。我应该使用专用的调度程序吗?最后我有调整大小的操作,请求是在map操作-ImageResizeRequest estFactory中创建的。

越勇锐
2023-03-14

不要不断地从< code>ExecutorService中重新创建< code>Scheduler,而是努力将它直接包装在构造函数中。

您根本不需要CompletableFuture,而subscribeOn应应用于平面图的内部,以在每个调整大小任务中潜在地选择单独的线程(它应用于每个通量时从池中选择一个线程):

class ImageResizeService {

  private final Executor executor; //TODO prefer an ExecutorService if possible
  private final Scheduler scheduler; //FIXME Schedulers.fromExecutor(executor)

  Mono<List<ImageResizeResult>> resize(List<ImageResizeRequest> requests) {
    //we get the requests on IO thread
    return Flux.fromIterable(requests)
            //for each request, perform asynchronous resize...
            .flatMap(r -> Mono
                //... by converting the resizeTask Callable to a Mono
                .fromCallable(r -> resizeTask(r).get())
                //... and making sure it executes on the executor
                .subscribeOn(scheduler)
            )
            .collectList();
  }
}

为了实现真正的并行化,您还有另一个选择:并行(). runOn()

Mono<List<ImageResizeResult>> resize(List<ImageResizeRequest> requests) {
    //we get the requests on IO thread
    return Flux.fromIterable(requests)
            //divide into N workloads
            //the executor _should_ be capable of this degree of parallelisation:
            .parallel(NUMBER_OF_DESIRED_THREADS)
            //actually tell to run each workload on a thread picked from executor
            .runOn(scheduler) 
            //here the workload are already running on their dedicated thread,
            //we can afford to block it and thus apply resize in a simpler `map`
            .map(r -> resizeTask(r).get()) //NB: the Supplier aspect can probably be removed
            //go back to a `Flux` sequence for collection into list
            .sequential()
            .collectList();
}

 类似资料:
  • 本文向大家介绍C#异步执行任务的方法,包括了C#异步执行任务的方法的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了C#异步执行任务的方法。分享给大家供大家参考。具体如下: 希望本文所述对大家的C#程序设计有所帮助。

  • 问题内容: 在过去的几个小时中,我一直在努力解决这个问题,但无法解决。我想我仍然必须习惯于函数式编程风格;) 我写了一个递归函数,它遍历目录结构并对某些文件进行处理。此功能使用异步IO方法。现在,我要在完成整个遍历后执行一些操作。 如何确保在执行完所有调用但仍使用异步IO功能后执行此操作? 问题答案: 查找“ 步骤”模块。它可以链接异步函数调用,并将结果从一个传递到另一个。

  • 是否可以调用一个异步方法,以便它从一个同步的方法异步运行?我不关心它挂起同步调用程序直到它返回,而是希望该方法被异步调用。

  • 这是在一次Android采访中被问到的。有人问我是否可以从异步任务 1 的 doInBackground() 方法(让它成为 Task1)启动另一个异步任务(让它成为 Task2)。我浏览了文档,其中说了以下内容: 必须在UI线程上创建任务实例。 必须在 UI 线程上调用 execute(Params...)。 根据这些陈述,我认为从另一个任务的后台方法启动一个任务是不可能的。此外,async任务

  • 在Server程序中如果需要执行很耗时的操作,比如一个聊天服务器发送广播,Web服务器中发送邮件。如果直接去执行这些函数就会阻塞当前进程,导致服务器响应变慢。 Swoole提供了异步任务处理的功能,可以投递一个异步任务到TaskWorker进程池中执行,不影响当前请求的处理速度。 程序代码 基于第一个TCP服务器,只需要增加onTask和onFinish 2个事件回调函数即可。另外需要设置task

  • 本文向大家介绍SpringBoot异步任务使用方法详解,包括了SpringBoot异步任务使用方法详解的使用技巧和注意事项,需要的朋友参考一下 步骤,如图所示: 1.添加异步任务业务类 2.添加测试控制器 3.添加启动类 4.右键项目Run As启动,访问url 结果: 以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持呐喊教程。