当前位置: 首页 > 知识库问答 >
问题:

CompletableFuture并行执行几个线程,串行执行几个线程

朱浩大
2023-03-14

我需要执行一些任务。有些任务是独立的,有些任务依赖于其他任务的成功执行。独立任务可以并行运行以获得更好的性能。我把这些任务称为服务。列链接说明哪些服务将以串联方式执行,哪些服务将以并联方式执行。列order描述了一组定义的服务所遵循的执行顺序。例如,服务A和B应该并行运行。如果它们已成功执行,则将执行服务C。请注意,服务C并不直接依赖于其先前服务的输出,但它必须在成功执行其先前服务后运行,因为服务C在执行期间需要由其先前服务生成的一些数据。成功执行服务C后,将执行下一个服务D,依此类推。此循环将继续,直到列表中的所有服务都已使用。

Tasks       service     link      order
Service A   01          03        1
Service B   02          03        2
Service C   03          04        3
Service D   04          05        4
Service E   05          07        5
Service F   06          07        6
Service G   07          (null)    7

下面是我的代码。

    public void executeTransactionFlow(DataVo dataVo) throws Exception {

    List<Callable<Boolean>> threadList = new ArrayList<>();
    List<String> serviceIds = new ArrayList<>();
    List<Future<Boolean>> futureList;
    String validatedRespCode = null, joinTo, prevJoinTo = null, serviceId;

    // Iterating through service flows map
    for (Map<String, String> map : serviceFlowsMap) {
        joinTo = map.get("link");
        serviceId = map.get("service");

        // A simple flag to differentiate which services should execute parallel and which in serial.
        if (null == prevJoinTo) {
            prevJoinTo = joinTo;
        }

        // Check for join condition. If join condition is same as previous then do not execute the thread list yet add current service in list
        if (null != joinTo && joinTo.equals(prevJoinTo)) {
            threadList.add(new Callable<String, DataVo>(serviceId, dataVo));
        }

        /*
         * 1. Run the threads in the list
         * 2. Empty the thread list
         * 3. Empty serviceIds list
         * 4. Set prevJoinTo
         */
        else {
            if (threadList.size() > 0) {
                prevJoinTo = joinTo;

                try {

                    // If list contain only 1 service then call, otherwise invokeAll
                    futureList = MyExecutor.executeServices(threadList, dataVo);

                    // During execution we cannot interrupt services, so we check here after they get back to here and interrupt if it has been timedout.
                    if (dataVo.isTimedout()) {
                        throw new Exception("Transaction thread is Interrupted or Timed-out");
                    }

                    // Validate service response codes and get decision in case of any failure
                    validatedRespCode = validateResponseOfExecutedServices(dataVo, futureList, serviceIds);

                    // If validationRespCode is non 00 then do not process further
                    if (null != validatedRespCode && !"200".equals(validatedRespCode)) {
                        break;
                    }
                }
                catch (Exception e) {
                    throw new Exception(e.getMessage(), e);
                }
                finally {
                    // clear thread list and serviceIds list. It will be populated for next parallel set of threads
                    threadList.clear();
                    serviceIds.clear();
                }
            }

            // Start preparing new thread list
            // Adding current service_id into threadList after executing previous services in parallel.
            threadList.add(new Callable<String, DataVo>(serviceId, dataVo));
        }
    }

    // Run remaining services
    if (!threadList.isEmpty()) {

        try {
            futureList = MyExecutor.executeServices(threadList, dataVo);
            validatedRespCode = validateResponseOfExecutedServices(dataVo, futureList, serviceIds);
        }
        catch (Throwable e) {
            throw new Exception(e.getMessage(), e);
        }
    }

    // Check validation response code
    if (null != validatedRespCode && !"200".equals(validatedRespCode)) {
        MyExecutor.callDeclineFlow(dataVo, validatedRespCode, null);
    }

}


/**
 * This method iterates through the thread list and checks for exceptions and service responses.
 * If service response is not success or if any exception has occurred then exception is thrown
 */
public String validateResponseOfExecutedServices(DataVo dataVo, List<Future<Boolean>> futureList, List<String> serviceIds) throws Exception {
    String finalResponse = "200", serviceResponse = null;

    /*
     * future list will be null if single service is executed (no other parallel transactions). The reason is that we do
     * not use invokeAll() on single service.
     */

    if (null != futureList && futureList.size() > 0) {
        for (Future<Boolean> future : futureList) {
            try {
                future.get();
            }
            catch (Exception e) {
                throw new Exception(e.getMessage(), e);
            }
        }
    }

    // Iterate through serviceIds and check responses.
    for (String serviceId : serviceIds) {
        serviceResponse = dataVo.getServiceResponse(serviceId);

        /*
         * if one of following response is found then consider it exception
         */
        if (null != serviceResponse && "400,401,402,403,404,500,501".contains(serviceResponse)) {
            throw new Exception("One of the service has been declined");
        }
    }

    return finalResponse;
}

如果CompletableFuture在这里是有益的,那么我如何有效地使用它?

future.get()是一个阻塞调用。如果我有10个并行执行的服务,那么这个future.get()将阻止其他服务,即使它们在我们等待的当前之前执行。如何避免这种阻塞?

我添加了问题陈述的更多细节,即添加订单列。服务需要遵循定义的顺序。服务A和B的顺序分别为1和2,但它们仍将并行执行,因为它们在链接中都有03值。我认为@Thomas在评论中建议,现在不需要依赖关系图方法。


共有3个答案

卢元龙
2023-03-14

只是无法将我的注意力从使用纯Java的基本问题上移开。这是我先前回答的一个修改版本。此答案包含两种样式—RxJavaExecutorService。它包含3个类:

  1. DependentSeriesOfActionsBase:包含一些可重用方法和公共字段的基类。这只是为了方便和容易理解的代码。
  2. DependentSeriesOfActionsCoreJava:这是基于ExecutorService的实现。我使用Future.get()等待操作的结果,不同的是等待本身是异步发生的。看看start Action().
  3. DependentSeriesOfActionsRxJava:较早的基于RxJava的实现。

代码:依赖序列OfActionsBase

abstract class DependentSeriesOfActionsBase{
    protected List<Action> allActions;
    protected ExecutorService SVC = Executors.newCachedThreadPool();

    protected boolean allActionsCompleted(){
        for( Action a : this.allActions ) if( !a.isCompleted() ) return false;
        return true;
    }

    protected static boolean allDepsCompleted( Action a, List<Action> allActions ){
        for( Action dep : allActions ) {
            if( a.getDependencies().contains( dep ) && !dep.isCompleted() ) return false;
        }

        return true;
    }

    /* Returns the actions that are dependent on Action <code>a</code>. */
    protected List<Action> getDeps( Action a, List<Action> list ){
        List<Action> deps = new ArrayList<>();
        for( Action dep : list ) if( dep.getDependencies().contains( a ) ) deps.add( dep );
        return deps;
    }

    /* Creates the action list with respective dependencies. */
    protected List<Action> createActions(){
        List<Action> actions = new ArrayList<>();

        Action a = createAction( 5000, "ServiceA", null );
        Action b = createAction( 5000, "ServiceB", null );
        Action c = createAction( 2000, "ServiceC", a, b );
        Action d = createAction( 2000, "ServiceD", c );
        Action e = createAction( 2000, "ServiceE", d );

        actions.add( a ); actions.add( b ); actions.add( c ); actions.add( d ); actions.add( e );
        return actions;
    }

    protected Action createAction( final long sleepMillis, final String name, Action... dependencies ) {
        List<Action> deps = null;
        if( dependencies != null ) {
            deps = new ArrayList<>();
            for( Action a : dependencies ) deps.add( a );
        }
        return Action.of( () -> {
            System.out.println( "Service (" + name + ") started" );
            try {
                Thread.sleep( sleepMillis );
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            System.out.println( "Service (" + name + ") completed" );
            return true;
        }, name, deps );
    }

    /* Attempts to start all actions that are present in the passed list. */
    protected void startAllActions( List<Action> actions ){
        for( Action action : actions ) {
            startAction( action, actions );
        }
    }

    protected abstract void startAction( Action action, List<Action> actions );


    protected void shutdown(){
        SVC.shutdown();
        try {
            SVC.awaitTermination( 2, TimeUnit.SECONDS );
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

代码:DependentSeriesOfActionsCoreJava

public class DependentSeriesOfActionsCoreJava extends DependentSeriesOfActionsBase{
    public static void main( String[] args ){
        new DependentSeriesOfActionsCoreJava().start();
    }

    private void start() {
        this.allActions = createActions();
        startAllActions( this.allActions );
    }

    protected void startAction( Action a, List<Action> list ){
        if( !a.isPending() ) return;
        if( !allDepsCompleted( a, allActions ) ) return;

        if( a.isPending() ) {
            synchronized (a.LOCK ) {
                if( a.isPending() ) {
                    a.setStatus( 1 ); //Set to running, so that it is not picked up twice. 

                    /* Submit the action to the ExecutorService and get the handle to the Future. */
                    final Future<?> fut = SVC.submit( () -> a.action.call() );

                    /* Submit the Future.get() action to the ExecutorService and execute the dependencies when it returns. */
                    SVC.submit( () -> {
                        Object returnVal = null;
                        /* Wait */
                        try {
                            fut.get(); 
                            a.setStatus( 2 );

                            /* If all actions are completed, shut down the ExecutorService. */
                            if( allActionsCompleted() ) shutdown();
                        } catch (InterruptedException | ExecutionException e) {
                            e.printStackTrace();
                        }

                        startAllActions( getDeps( a, this.allActions ) );
                    } );

                }
            }
        }
    }
}

代码:dependentsriesofactionsrxjava

public class DependentSeriesOfActionsRxJava extends DependentSeriesOfActionsBase{
    /* To listen to the completion of a task, so that the dependent tasks may be scheduled. */
    private Subject<Action> completionSub = io.reactivex.subjects.BehaviorSubject.create();

    /* To listen to the completion of all tasks, so that ExecutorService may shut down. */
    private Subject<Boolean> allActionCompletedSub = io.reactivex.subjects.BehaviorSubject.create();

    public static void main( String[] args ){
        new DependentSeriesOfActionsRxJava().start();
    }

    private void start() {
        this.allActions = createActions();
        subscribeToActionCompletions();
        subscribeToSvcShutdown();

        startAllActions( this.allActions );
    }

    private void subscribeToSvcShutdown(){
        /* If all actions have been completed, shut down the ExecutorService. */
        this.allActionCompletedSub.subscribe( allScheduled -> {
            if( allScheduled ) shutdown();
        });
    }

    private void subscribeToActionCompletions(){
        this.completionSub.subscribe( complAction -> {
            /* Get the actions that are dependent on this recently completed action and "attempt" to start them. */
            List<Action> deps = getDeps( complAction, this.allActions );
            startAllActions( deps );

            /* If all actions have got completed, raise the flag. */
            if( allActionsCompleted() ) this.allActionCompletedSub.onNext( true );
        });
    }

    /* Attempts to start an action. Only if it is still pending and all of its dependencies are completed. */
    protected void startAction( Action a, List<Action> list ){
        if( !a.isPending() ) return;
        if( !allDepsCompleted( a, allActions ) ) return;

        if( a.isPending() ) {
            synchronized (a.LOCK ) {
                if( a.isPending() ) {
                    a.setStatus( 1 ); //Set to running, so that it is not picked up twice. 
                    SVC.submit( () -> {
                        try {
                            a.getAction().call();
                        } catch (Exception e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }

                        a.setStatus( 2 ); //Set to completed. (We may have to synchronize this.)
                        this.completionSub.onNext( a );
                    } );
                }
            }
        }
    }

}
焦同
2023-03-14

ThenComine可以用来表示CompletionStages之间的依赖关系,允许您在两者都完成后执行任务。然后,您可以使用thenApply预制后续操作:

CompletionStage<ServiceAResponse> serviceAResponse = callServiceA();
CompletionStage<ServiceBResponse> serviceBResponse = callServiceB();


CompletionStage<ServiceEResponse> result = serviceA.thenCombine(serviceBResponse, (aResponse, bResponse) -> serviceC.call(aResponse, bResponse))                                                     
                                         .thenApply(cResponse -> serviceD.call(cResponse))                                                    
                                         .thenApply(dResponse -> serviceE.call(eResponse))



public CompletionStage<ServiceAResponse> callServiceA() {
    return CompletableFuture.supplyAsync(() -> serviceA.call());
}

public CompletionStage<ServiceBResponse> callServiceB() {
    return CompletableFuture.supplyAsync(() -> serviceB.call());
}
施飞雨
2023-03-14

真棒的问题。虽然从技术上讲,完全可以使用ExecutorServiceFuture来实现这一点,但根据我的理解,更好的方法是使用反应式编程,而不是单纯依赖FutureCompletableFutureCompletionService等。主要原因是它可能很快成为一个难以阅读的代码。

下面是我使用RxJava 2.2.16和ExecutorService的方法:

  1. 使用ExecutorServicesubmit()操作,执行不依赖于其他人或其所有依赖项的操作
  2. 要知道操作已完成,请使用RxJava的BehaviorSubject。当一个操作完成时,为其每个依赖项触发步骤(1)
  3. 完成所有操作后,关闭执行器服务。为此,请使用另一个行为主题

很抱歉,由于采用了新的方法,我以自己的方式编写了整个逻辑。但它仍然围绕着你给出的主要要求。最好先看看AppRxjava中的Action模型类和createActions()方法。从那里,您应该能够遵循代码。为了模拟一些时间消耗,我使用了著名的线程。sleep()技术。

public class AppRxJava{
    /* To listen to the completion of a task, so that the dependent tasks may be scheduled. */
    private Subject<Action> completionSub = io.reactivex.subjects.BehaviorSubject.create();

    /* To listen to the completion of all tasks, so that ExecutorService may shut down. */
    private Subject<Boolean> allActionCompletedSub = io.reactivex.subjects.BehaviorSubject.create();

    private ExecutorService SVC = Executors.newCachedThreadPool();
    private List<Action> allActions;

    public static void main( String[] args ){
        new AppRxJava().start();
    }

    private void start() {
        this.allActions = createActions();
        subscribeToActionCompletions();
        subscribeToSvcShutdown();

        startAllActions( this.allActions );
    }

    private void subscribeToSvcShutdown(){
        /* If all actions have been completed, shut down the ExecutorService. */
        this.allActionCompletedSub.subscribe( allScheduled -> {
            if( allScheduled ) {
                SVC.shutdown();
                try {
                    SVC.awaitTermination( 2, TimeUnit.SECONDS );
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
    }

    private void subscribeToActionCompletions(){
        this.completionSub.subscribe( complAction -> {
            /* Get the actions that are dependent on this recently completed action and "attempt" to start them. */
            List<Action> deps = getDeps( complAction, this.allActions );
            startAllActions( deps );

            /* If all actions have got completed, raise the flag. */
            if( allActionsCompleted() ) this.allActionCompletedSub.onNext( true );
        });
    }

    /* Attempts to start all actions that are present in the passed list. */
    private void startAllActions( List<Action> actions ){
        for( Action action : actions ) {
            startAction( action, actions );
        }
    }

    /* Attempts to start an action. Only if it is still pending and all of its dependencies are completed. */
    private void startAction( Action a, List<Action> list ){
        if( !a.isPending() ) return;
        if( !allDepsCompleted( a, allActions ) ) return;

        if( a.isPending() ) {
            synchronized (a.LOCK ) {
                if( a.isPending() ) {
                    a.setStatus( 1 ); //Set to running, so that it is not picked up twice. 
                    SVC.submit( () -> {
                        try {
                            a.getAction().call();
                        } catch (Exception e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }

                        a.setStatus( 2 ); //Set to completed. (We may have to synchronize this.)
                        this.completionSub.onNext( a );
                    } );
                }
            }
        }
    }

    private boolean allActionsCompleted(){
        for( Action a : this.allActions ) if( !a.isCompleted() ) return false;
        return true;
    }

    private static boolean allDepsCompleted( Action a, List<Action> allActions ){
        for( Action dep : allActions ) {
            if( a.getDependencies().contains( dep ) && !dep.isCompleted() ) return false;
        }

        return true;
    }

    /* Returns the actions that are dependent on Action <code>a</code>. */
    private List<Action> getDeps( Action a, List<Action> list ){
        List<Action> deps = new ArrayList<>();
        for( Action dep : list ) if( dep.getDependencies().contains( a ) ) deps.add( dep );
        return deps;
    }

    /* Creates the action list with respective dependencies. */
    private List<Action> createActions(){
        List<Action> actions = new ArrayList<>();

        Action a = createAction( 5000, "ServiceA", null );
        Action b = createAction( 5000, "ServiceB", null );
        Action c = createAction( 2000, "ServiceC", a, b );
        Action d = createAction( 2000, "ServiceD", c );
        Action e = createAction( 2000, "ServiceE", d );

        actions.add( a ); actions.add( b ); actions.add( c ); actions.add( d ); actions.add( e );
        return actions;
    }

    private Action createAction( final long sleepMillis, final String name, Action... dependencies ) {
        List<Action> deps = null;
        if( dependencies != null ) {
            deps = new ArrayList<>();
            for( Action a : dependencies ) deps.add( a );
        }
        return Action.of( () -> {
            System.out.println( "Service (" + name + ") started" );
            try {
                Thread.sleep( sleepMillis );
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            System.out.println( "Service (" + name + ") completed" );
            return true;
        }, name, deps );
    }


}

还有动作模型课。这表示一个操作及其依赖的操作列表。(与您最初的表述略有不同。但我认为,如果您能相应地处理它,任何一种方式都可以。)

public class Action{
    Callable<Boolean> action;
    String name;
    List<Action> dependencies = new ArrayList<>();
    AtomicInteger status = new AtomicInteger( 0 ); //0 = Pending, 1 = Scheduled, 2 = Completed
    public static final Object LOCK = new Object();

    private Action(Callable<Boolean> action, String name, List<Action> dependencies) {
        super();
        this.action = action;
        this.name = name;
        if( dependencies != null ) this.dependencies = dependencies;
    }

    public static Action of( Callable<Boolean> action, String name, List<Action> dependencies ){
        return new Action( action, name, dependencies );
    }

    public Callable<Boolean> getAction(){
        return action;
    }

    public String getName(){
        return name;
    }

    public List<Action> getDependencies(){
        return dependencies;
    }

    public boolean isCompleted(){
        return this.status.get() == 2;
    }

    public boolean isPending(){
        return this.status.get() == 0;
    }

    public boolean isScheduled(){
        return this.status.get() == 1;
    }

    public void setStatus( int status ){
        this.status.getAndSet( status );
    }

    @Override
    public int hashCode(){
        final int prime = 31;
        int result = 1;
        result = prime * result + ((name == null) ? 0 : name.hashCode());
        return result;
    }

    @Override
    public boolean equals( Object obj ){
        if (this == obj) return true;
        if (obj == null) return false;
        if (getClass() != obj.getClass()) return false;
        Action other = (Action) obj;
        if (name == null) {
            if (other.name != null)
                return false;
        } else if (!name.equalsIgnoreCase( other.name )) return false;
        return true;
    }

}
 类似资料:
  • 问题内容: 我们有一个基于石英的调度程序应用程序,该应用程序每分钟运行约1000个作业,每分钟的秒数均匀分布,即每秒约16-17个作业。理想情况下,这16-17个作业应同时触发,但是该作业的execute方法的第一个语句(仅记录执行时间)非常晚。例如,假设我们从05:00到05:04每分钟安排1000个作业。因此,理想情况下,计划在05:03:50进行的作业应该在05:03:50记录了execut

  • 执行流程 PHP的生命周期: 模块初始化阶段 请求初始化阶段 执行PHP脚本阶段 请求结束阶段 模块关闭阶段

  • 我确信这两个列表都不是空的,并且正在调用,但是没有调用order execution run方法....

  • 主要内容:1 如何使用多个线程执行一个任务,2 如何使用多个线程执行多个任务1 如何使用多个线程执行一个任务 如果需要由多个线程执行单个任务,则只有一个run()方法,例如: 1.1 多个线程执行一个任务示例1 输出结果为: 1.2 多个线程执行一个任务示例2 输出结果为: 注意:每个线程在单独的堆栈中运行。 2 如何使用多个线程执行多个任务 如果必须通过多个线程执行多个任务,请使用多个run() 方法: 2.1 多个线程执行多个任务示例1 输出结果为: 2.2 多个线程

  • 问题内容: 我对CompletableFuture方法有疑问: 事情是JavaDoc这么说的: 返回一个新的CompletionStage,当此阶段正常完成时,将使用该阶段的结果作为所提供函数的参数来执行该阶段。有关涵盖异常完成的规则​​,请参见CompletionStage文档。 那线程呢?这将在哪个线程中执行?如果将来由线程池完成怎么办? 问题答案: 文档中指定的策略可以帮助您更好地理解: 对

  • 我有一个关于CompletableFuture方法的问题: 问题是JavaDoc只说了这么一句话: 返回一个新的CompletionStage,当此阶段正常完成时,将以此阶段的结果作为所提供函数的参数执行该CompletionStage。有关例外完成的规则,请参阅CompletionStage文档。 穿线呢?这将在哪个线程中执行?如果未来是由一个线程池来完成的呢?