当前位置: 首页 > 知识库问答 >
问题:

单声道反应器-执行并行任务

邢杰
2023-03-14

我不熟悉Reactor框架,并尝试在现有的实现中使用它。LocationProfileService和InventoryService都返回一个Mono,并并行执行,彼此之间没有依赖关系(来自MainService)。在LocationProfileService中-发出了4个查询,最后2个查询依赖于第一个查询。

有什么更好的方法来写这个?我看到调用按顺序执行,而其中一些应该并行执行。正确的做法是什么?

public class LocationProfileService {
        static final Cache<String, String> customerIdCache //define Cache

        @Override
        public Mono<LocationProfileInfo> getProfileInfoByLocationAndCustomer(String customerId, String location) {
            //These 2 are not interdependent and can be executed immediately
            Mono<String> customerAccountMono = getCustomerArNumber(customerId,location) LocationNumber).subscribeOn(Schedulers.parallel()).switchIfEmpty(Mono.error(new CustomerNotFoundException(location, customerId))).log();
            Mono<LocationProfile> locationProfileMono = Mono.fromFuture(//location query).subscribeOn(Schedulers.parallel()).log();

    //Should block be called, or is there a better way to do ?
            String custAccount = customerAccountMono.block(); // This is needed to execute and the value from this is needed for the next 2 calls

            Mono<Customer> customerMono = Mono.fromFuture(//query uses custAccount from earlier step).subscribeOn(Schedulers.parallel()).log();
            Mono<Result<LocationPricing>> locationPricingMono = Mono.fromFuture(//query uses custAccount from earlier step).subscribeOn(Schedulers.parallel()).log();

            return Mono.zip(locationProfileMono,customerMono,locationPricingMono).flatMap(tuple -> {
                LocationProfileInfo locationProfileInfo = new LocationProfileInfo();
                //populate values from tuple
                return Mono.just(locationProfileInfo);
            });


        }

        private Mono<String> getCustomerAccount(String conversationId, String customerId, String location) {
            return CacheMono.lookup((Map)customerIdCache.asMap(),customerId).onCacheMissResume(Mono.fromFuture(//query).subscribeOn(Schedulers.parallel()).map(x -> x.getAccountNumber()));
        }

}


public class InventoryService {

    @Override
    public Mono<InventoryInfo> getInventoryInfo(String inventoryId) {
        Mono<Inventory> inventoryMono = Mono.fromFuture(//inventory query).subscribeOn(Schedulers.parallel()).log();
        Mono<List<InventorySale>> isMono = Mono.fromFuture(//inventory sale query).subscribeOn(Schedulers.parallel()).log();

        return Mono.zip(inventoryMono,isMono).flatMap(tuple -> {
            InventoryInfo inventoryInfo = new InventoryInfo();
            //populate value from tuple

            return Mono.just(inventoryInfo);

        });
    }

}

public class MainService {

        @Autowired
        LocationProfileService locationProfileService;
        @Autowired
        InventoryService inventoryService

        public void mainService(String customerId, String location, String inventoryId) {
            Mono<LocationProfileInfo> locationProfileMono = locationProfileService.getProfileInfoByLocationAndCustomer(....);
            Mono<InventoryInfo> inventoryMono = inventoryService.getInventoryInfo(....);

            //is using block fine or is there a better way to do?
            Mono.zip(locationProfileMono,inventoryMono).subscribeOn(Schedulers.parallel()).block();
        }

}

共有1个答案

吕树
2023-03-14

您不需要为了获得pass参数而阻塞代码,因为您的代码非常接近解决方案。我使用您提供的类名编写了代码。只要更换所有的单声道。只是(……)通过呼叫正确的服务。

    public Mono<LocationProfileInfo> getProfileInfoByLocationAndCustomer(String customerId, String location) {
    Mono<String> customerAccountMono = Mono.just("customerAccount");
    Mono<LocationProfile> locationProfileMono = Mono.just(new LocationProfile());

    return Mono.zip(customerAccountMono, locationProfileMono)
            .flatMap(tuple -> {
                Mono<Customer> customerMono = Mono.just(new Customer(tuple.getT1()));
                Mono<Result<LocationPricing>> result = Mono.just(new Result<LocationPricing>());
                Mono<LocationProfile> locationProfile = Mono.just(tuple.getT2());
                return Mono.zip(customerMono, result, locationProfile);
            })
            .map(LocationProfileInfo::new)
    ;
}

public static class LocationProfileInfo {
    public LocationProfileInfo(Tuple3<Customer, Result<LocationPricing>, LocationProfile> tuple){
        //do wathever
    }
}


public static class LocationProfile {}

private static class Customer {
    public Customer(String cutomerAccount) {
    }
}

private static class Result<T> {}

private static class LocationPricing {}

请记住,第一个拉链不是必需的。我重写它是为了得到你的解决方案。但我会用不同的方式来解决这个问题。这会更清楚。

public Mono<LocationProfileInfo> getProfileInfoByLocationAndCustomer(String customerId, String location) {
return Mono.just("customerAccount") //call the service                                                
        .flatMap(customerAccount -> {                                                                 
            //declare the call to get the customer                                                    
            Mono<Customer> customerMono = Mono.just(new Customer(customerAccount));                   

            //declare the call to get the location pricing                                            
            Mono<Result<LocationPricing>> result = Mono.just(new Result<LocationPricing>());          

            //declare the call to get the location profile                                            
            Mono<LocationProfile> locationProfileMono = Mono.just(new LocationProfile());             

            //in the zip call all the services actually are executed                                  
            return Mono.zip(customerMono, result, locationProfileMono);                               
        })                                                                                            
        .map(LocationProfileInfo::new)                                                                
;                                                                                                     

}

 类似资料:
  • 我找不到关于我们被要求进行的调查的具体答案 我看到并行流在使用少量线程时性能可能不是那么好,而且当DB在处理当前请求的同时阻止下一个请求时,它的表现显然也不是那么好 然而,我发现实现任务执行器与并行流的开销是巨大的,我们实现了一个POC,它只需要这一行代码就能满足并发需求: 而在Task Executor中,我们需要重写Runnable接口并编写一些繁琐的代码,以使Runnable不是空的,并返回

  • 我在我的项目中使用了spring-boot-starter-webflux、ractor-test和spring-boot-starter-test 2.0.0.m7。在我的中有一个,您可以通过方法添加字符串值。还可以询问通过方法添加到列表中的所有值。问题是如何测试?我有,但它似乎不起作用,因为返回的总是0。我知道问题是中的方法,但我不知道原因和如何解决它。有人知道我应该如何创建一个有效的Juni

  • 失败:生成失败,出现异常。 > 错误:任务“:app:CompiledEbugJavaWithJavac”执行失败。 编译失败;有关详细信息,请参阅编译器错误输出。

  • null 这工作得很好,但对于下面的命令,我得到一个错误 错误: > 错误:任务“:app:InstallRelease”执行失败。 com.android.builder.testing.api.deviceException:com.android.ddmlib.installexception:install_failed_update_incompatible:包io.nativebase

  • 问题内容: 我正在使用python 2.7,我有一些看起来像这样的代码: 此处唯一的依赖项如下:dependent1需要等待任务1-3,Dependent2需要等待任务4-6,而dependent3需要等待依赖项1-2 …以下是可以的:首先运行全部6个任务并行,然后是前两个从属。 我希望尽可能多的任务并行运行,我已经在Google上搜索了一些模块,但是我希望避免使用外部库,并且不确定队列线程技术如

  • 我正在将阻塞顺序编排框架转换为反应式。现在,这些任务是动态的,并通过JSON输入输入到引擎中。引擎提取类并执行方法,并使用每个任务的响应保存状态 如何在Reactor中实现相同的链接?如果这是一个静态DAG,我会将它与或操作符链接起来,但由于它是动态的,我如何继续执行反应性任务并收集每个任务的输出? 示例: 非反应性接口: 核心发动机 按照上面的示例,如果我将任务转换为返回Mono 更新:: 反应