当前位置: 首页 > 知识库问答 >
问题:

ScheduledExecutorService的每个周期后调用方法

鲜于河
2023-03-14

我需要以下场景:

在一个周期内运行所有schduledFutures,并在所有任务完成执行后每次调用方法tasksComplted()时调用。在实际调度周期之后调用tasksComplted()时,下一个调度周期不得等待。

简而言之:在实际调度周期完成后调用方法,并且不停止下一个调度周期

下面的代码创建任务和调度工作。但是,当一个周期内的所有任务都完成时,我无法调用tasksCompleted()。

import java.time.LocalTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public class Scheduler {

    public static void main(String[] args) {
        final ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
        System.out.println("- [" + LocalTime.now() + "] run parent-task...");

        // create 3 tasks: each task needs 7 seconds.
        var tasks = createTasks("child", 3, 7);
        List<ScheduledFuture<?>> futures = new ArrayList<>();
        tasks.forEach(t ->
        {
            ScheduledFuture<?> future = ses.scheduleWithFixedDelay(t, 0, 2, TimeUnit.SECONDS);
            futures.add(future);
        });
        // this does not work..
        var scheduleCycleCompleted = futures.stream().allMatch(f -> f.isDone());
        System.out.println("scheduleCycleCompleted: " + scheduleCycleCompleted);

        // maybe a solution with CompletableFuture?
        CompletableFuture[] cfs = futures.toArray(new CompletableFuture[futures.size()]);

    }

    static void tasksCompleted() {
        System.out.println("schedule cycle completed");
    }

    static List<Runnable> createTasks(String group, int numbersOfTasks, long taskDuration) {
        var tasks = new ArrayList<Runnable>();
        for (var i = 0; i < numbersOfTasks; i++) {
            int taskNr = i;
            Runnable task = () ->
            {
                System.out.println("- [" + LocalTime.now() + "] Running " + group + "-task" + taskNr + "...[needs "
                        + taskDuration + " seconds]");
                try {
                    TimeUnit.SECONDS.sleep(taskDuration);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    Thread.currentThread().interrupt();
                }
            };
            tasks.add(task);
        }
        return tasks;
    }

}

共有2个答案

胡弘毅
2023-03-14

要处理调度周期内不同运行时的情况,您需要的是一种识别哪个任务属于哪个组的方法。您可以通过给他们一个识别计数器来做到这一点,这样每次执行任务时,计数器都用于命名它正在运行的组。

interface GroupedRunnable extends Runnable {
    String getGroup();
}

class CountingRunnable implements GroupedRunnable {
    private AtomicInteger counter = new AtomicInteger();
    private final Runnable delegate;
    private final String taskName;

    CountingRunnable(Runnable delegate, String taskName) {
        this.delegate = delegate;
        this.taskName = taskName;
    }
    public void run() {
        System.out.printf("[%s] - Running task %s in group %s%n", LocalTime.now(), taskName, getGroup());
        delegate.run();
        counter.incrementAndGet();
        System.out.printf("[%s] - Running task %s in group %s finished%n", LocalTime.now(), taskName, getGroup());
    }

    @Override
    public String getGroup() {
        return counter.toString();
    }
}

现在,您可以有一个watchdog类来跟踪哪个组的哪些成员已经执行了。因为您知道一个组有多少成员,所以一个简单的计数器就足够了。

class GroupMonitoringService {
    // key: group, value: tasks
    Map<String, AtomicInteger> finishedTasks = new HashMap<>();
    private final Runnable finisher;
    private final int groupSize;

    GroupMonitoringService(Runnable finisher, int groupSize) {
        this.finisher = finisher;
        this.groupSize = groupSize;
    }

    public synchronized void taskFinished(String group) {
        var finishedInGroup = finishedTasks.computeIfAbsent(group, k -> new AtomicInteger());
        if (finishedInGroup.incrementAndGet() >= groupSize) {
            // scheduling group complete
            System.out.printf("Group %s finished executing%n", group);
            finisher.run();
            finishedTasks.remove(group);
        }
    }
}

现在,您所要做的就是将原始任务包装到一个分组任务中,并确保监控服务在完成后得到通知,所以请再次包装。

private static List<Runnable> createTasks() {
    List<Runnable> result = new ArrayList<>();
    for (int i = 0; i < GROUP_SIZE; i++) {
        RandomWaitTask originalTask = new RandomWaitTask();
        CountingRunnable groupedTask = new CountingRunnable(originalTask, "Task " + i);
        Runnable notifyingRunnable = () -> {
            groupedTask.run();
            MONITORING_SERVICE.taskFinished(groupedTask.getGroup());
        };
        result.add(notifyingRunnable);
    }
    return result;
}

现在你可以安排这些了。

您可以在这里看到整个代码(虽然由于资源限制,它实际上无法在该站点上正常运行,但如果您将其复制到IDE中,它就会正常工作)。

姬烨磊
2023-03-14

已更新

我希望它能起作用。

倒计时闩锁将在此处解决问题。

import java.time.LocalTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class Scheduler {

    public static void main(String[] args) throws InterruptedException {
        final ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);

        System.out.println("- [" + LocalTime.now() + "] run parent-task...");

        int noOfTask=3;
        CountDownLatch countDownLatch = new CountDownLatch(noOfTask);

        TaskComplete taskComplete=new TaskCompleteImpl(noOfTask,countDownLatch);

        // create 3 tasks: each task needs 7 seconds.
        List<Runnable> tasks = createTasks("child", noOfTask, 7,countDownLatch,taskComplete);

        List<ScheduledFuture<?>> futures = new ArrayList<>();
        tasks.forEach(t ->
        {
            ScheduledFuture<?> future = ses.scheduleWithFixedDelay(t, 0, 2, TimeUnit.SECONDS);
            futures.add(future);
        });
        // this does not work..





    }

    interface TaskComplete{
        void tasksCompleted();
    }

    static class TaskCompleteImpl implements TaskComplete {

        int totalTask=0;
        int index=0;
        CountDownLatch countDownLatch;
        public TaskCompleteImpl(int totalTask){

        }

        public TaskCompleteImpl(int noOfTask, CountDownLatch countDownLatch) {
            this.totalTask=noOfTask;
            this.countDownLatch=countDownLatch;
        }

        @Override
        public synchronized void tasksCompleted() {
            index=index+1;
            if(index==totalTask){
                System.out.println("schedule cycle completed");
                index=0;
                countDownLatch=new CountDownLatch(totalTask);
            }

        }
    }


    static List<Runnable> createTasks(String group, int numbersOfTasks, long taskDuration, CountDownLatch countDownLatch, TaskComplete taskComplete) {
        List tasks = new ArrayList<Runnable>();
        for (int i = 0; i < numbersOfTasks; i++) {
            int taskNr = i;
            Runnable task = () ->
            {
                System.out.println("- [" + LocalTime.now() + "] Running " + group + "-task" + taskNr + "...[needs "
                        + taskDuration + " seconds]");
                try {
                    TimeUnit.SECONDS.sleep(taskDuration);
                    countDownLatch.countDown();
                    countDownLatch.await();
                    taskComplete.tasksCompleted();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    Thread.currentThread().interrupt();
                }
            };
            tasks.add(task);
        }
        return tasks;
    }

}
 类似资料:
  • 我已经反汇编了一个用MSVC v140编译的小型C程序,并试图估计每条指令的周期,以便更好地理解代码设计如何影响性能。我一直在关注迈克·阿克顿(MikeActon)关于“面向数据的设计和C”的CppCon 2014演讲,特别是我链接到的部分。 他在信中指出了以下几行: 然后他声称,这些2 x 32位读取可能位于同一缓存线上,因此大约需要200个周期。 《英特尔64和IA-32体系结构优化参考手册》

  • 问题内容: 在哪里进行调用将使我的状态失水的API调用的最佳位置是哪里?构造函数或生命周期方法之一,例如ComponentWillMount? 问题答案: 最好从生命周期方法进行api调用,反应文档也建议相同。 根据DOC: componentDidMount: 挂载组件后立即调用componentDidMount()。需要DOM节点的初始化应该在这里进行。 如果需要从远程端点加载数据,这是实例化

  • 方向从一个片段改变到另一个片段。 方位1(景观至肖像): 片段1的OnSaveInstanceState()。 片段2的OnSaveInstanceState()。 片段2的 onstop(). 片段1的 ondestroy(). 片段1的 ondetach()。 片段2的onattach(). 片段2的oncreateview()。 片段2的 onstart(). null 我是这样添加片段的-

  • 本文向大家介绍C#获取每个年,月,周的起始日期和结束日期的方法,包括了C#获取每个年,月,周的起始日期和结束日期的方法的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了C#获取每个年,月,周的起始日期和结束日期的方法。分享给大家供大家参考,具体如下: 我们在写程序的时候往往要计算出年,月,周的开始日期和结束日期,在这里给出统一求解方法 PS:这里再为大家推荐几款日期与时间相关工具供大家参考使

  • 我正在使用以下方法通过显示/隐藏片段(在我的NavigationDrawer中)在片段之间切换。 我不清楚的是,当我显示或隐藏碎片生命周期的哪个方法被调用?(由于没有onShow()或onHide()这样的方法,我不太确定该使用什么)。我想在显示和隐藏某个片段时执行特定的操作。

  • 本文向大家介绍前台切换到后台,然后再回到前台,Activity生命周期回调方法。弹出Dialog,生命值周期回调方法?相关面试题,主要包含被问及前台切换到后台,然后再回到前台,Activity生命周期回调方法。弹出Dialog,生命值周期回调方法?时的应答技巧和注意事项,需要的朋友参考一下 首先定义两个Activity,分别为A和B。 完整顺序为:A调用onCreate()方法 ---> onSt