当前位置: 首页 > 知识库问答 >
问题:

队列中的Java线程一旦完成任务,就需要Hibernate

韦锦程
2023-03-14

我有以下案例要模仿

程序首先查询每个接收参数的数据库并了解要运行的任务量。

定义了具有固定最大线程数的线程队列来执行任务。每个任务启动一个流程,该流程可能具有不同的配置,并且可能需要不同的时间。任务完成后,每个任务都有一个可配置的睡眠时间。

一旦任务Hibernate,它就不能阻塞执行队列中的某个位置。执行队列必须继续执行准备好执行的任务

我发现由于某种原因很难编码(主要是由于最后的要求)任何帮助都将不胜感激

谢谢

共有1个答案

赵涵亮
2023-03-14

这是一段很长但很简单的代码,用于说明预定的重新提交,我还没有测试过:)

import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.*;

interface Repeatable {
    boolean shouldBeRepeated();
    /**
     * @return how long to sleep
     */
    long delayBeforeRepeat();

    /**
     * @return "initial" state of this task instance, so this state can be resubmitted for repeated execution
     */
    BusinessTask reset();
}

/**
 * Whatever suits your business logic
 */
interface BusinessTask extends Callable<Repeatable> {
}

class BusinessTaskCompletionData {
    final BusinessTask businessTask;

    /**
     * Timestamp when this task should be resubmitted
     */
    final long nextSubmitTime;

    BusinessTaskCompletionData(BusinessTask businessTask, long nextSubmitTime) {
        this.businessTask = businessTask;
        this.nextSubmitTime = nextSubmitTime;
    }
}

class TaskResusltsConsumer implements Runnable {

    private final CompletionService<Repeatable> completionService;
    private final Deque<BusinessTaskCompletionData> completedTasks;

    TaskResusltsConsumer(ExecutorService executor, Deque<BusinessTaskCompletionData> completedTasks) {
        this.completedTasks = completedTasks;
        completionService = new ExecutorCompletionService<>(executor);
    }

    @Override
    public void run() {
        while (true) {
            try {
                Future<Repeatable> completedBusinessTask = completionService.take();
                Repeatable repeatable = completedBusinessTask.get();
                if (repeatable.shouldBeRepeated()) {
                    completedTasks.add(new BusinessTaskCompletionData(repeatable.reset(),
                            System.currentTimeMillis() + repeatable.delayBeforeRepeat()));
                }
            } catch (ExecutionException | InterruptedException ie) {
                // handle somehow
            }
        }
    }
}

class TasksSupplier implements Runnable {

    private final Deque<BusinessTaskCompletionData> completedTasks;
    private final ExecutorService executor;

    TasksSupplier(Deque<BusinessTaskCompletionData> completedTasks, ExecutorService executor) {
        this.completedTasks = completedTasks;
        this.executor = executor;
    }

    @Override
    public void run() {
        while (true) {
            BusinessTask t = getTaskSomehow();
            executor.submit(getTaskSomehow());
        }
    }

    private BusinessTask getTaskSomehow() {
        // implement
        return null;
    }
}

/**
 * Actual implementation of logic to obtain 'initial state' of task to repeat and repeat schedule
 */
class BusinessData implements Repeatable {
    // whatever
}

public class SOTest {

    public static void main(String[] args) {

        final LinkedList<BusinessTaskCompletionData> tasksToRepeat = new LinkedList<>();

        // workers pool
        final ExecutorService workersPool = Executors.newFixedThreadPool(10);

        // controllers pool: 1 thread for supplier, the other for results consumer
        final ExecutorService controllersPool = Executors.newFixedThreadPool(2);
        controllersPool.submit(new TasksSupplier(tasksToRepeat, workersPool));
        controllersPool.submit(new TaskResusltsConsumer(workersPool, tasksToRepeat));

        // resubmitter scheduled pool
        ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        scheduledExecutor.scheduleWithFixedDelay(new Runnable() {
                @Override
                public void run() {
                    long now = System.currentTimeMillis();
                    Iterator<BusinessTaskCompletionData> it = tasksToRepeat.iterator();
                    while (it.hasNext()) {
                        BusinessTaskCompletionData data = it.next();
                        if (data.nextSubmitTime >= now) {
                            workersPool.submit(data.businessTask);
                            it.remove();
                        }
                    }
                }
            },
            // initial delay of 1 sec
            1000,
            // periodic delay of 1 sec
            1000,
            TimeUnit.MILLISECONDS
        );
    }

}
 类似资料:
  • 我创建了一个带有Spout的Storm拓扑,该Spout会发出许多元组用于基准测试。一旦所有的元组都从spout发出或者拓扑中不再有任何元组流动,我就想停止/终止我的拓扑。

  • 问题内容: 我是python和线程的新手。我已经编写了充当网络爬虫的python代码,并在网站中搜索特定的关键字。我的问题是,如何使用线程同时运行类的三个不同实例。当实例之一找到关键字时,所有三个实例都必须关闭并停止爬网。这是一些代码。 如何使用线程让Crawler同时执行三个不同的爬网? 问题答案: 似乎没有一种(简单的)方法可以终止Python中的线程。 这是一个并行运行多个HTTP请求的简单

  • 我试过这样的事情: create(new observable.onsubscribe>(){@override public void call(subscriber>subscriber){subscriber.onnext(getList());subscriber.oncompleted();}}).subscribe... 但是这个订阅在一个迭代上,onNext只传递完整的列表,而不是列

  • 我需要一个库或我们的软件工具,可以: 1)将线程/作业/任务(任何东西--如果需要,我们可以重写代码,我们在mintue有线程对象)放入像system这样的队列中2)我们可以定义同时最多运行多少线程3)线程完成后,线程从队列中移除,这样GC就可以移除所有涉及的实体。 我正在进行大量阅读,发现ExecutorService(Executors.newFixedThreadPool(5);)但问题可能

  • 我想了解在fork-连接池中处理任务Java顺序。 到目前为止,我在文档中找到的唯一相关信息是关于一个名为“asyncMode”的参数,该参数“如果此池对从未加入的分叉任务使用本地先进先出调度模式,则为真”。 我对这句话的解释是,每个工人都有自己的任务队列;工人从他们自己队列的前面接受任务,或者如果他们自己的队列是空的,从其他工人队列的后面偷走任务;如果asyncMode为真(分别为假),工作人员

  • 问题内容: 我要实现的混合,并且一旦所有要素都成功完成,返回的未来就成功完成,或者一旦任何要素都异常完成,它就异常地完成(相同的例外)。如果多个元素失败,则返回其中任何一个的异常就足够了。 用例 我有一个任务需要汇总由s 列表返回的子结果,但是一旦其中任何一个失败,该任务就应该停止等待。我知道子任务将继续运行,这没关系。 相关问题 我发现在“ 等待”列表上等待,最初看起来像是一个重复的问题,但被接

  • 本文向大家介绍Python实现简单多线程任务队列,包括了Python实现简单多线程任务队列的使用技巧和注意事项,需要的朋友参考一下 最近我在用梯度下降算法绘制神经网络的数据时,遇到了一些算法性能的问题。梯度下降算法的代码如下(伪代码): 一般来说,当网络请求 plot.ly 绘图时会阻塞等待返回,于是也会影响到其他的梯度下降函数的执行速度。 一种解决办法是每调用一次 plotly.write 函数

  • 我们在spring boot应用程序中实现了一个计划任务,用于从MQ读取消息。在IntelliJ中调试时,我看到任务的线程被启动,然后在任务完成后进入等待状态。 这是正常的还是应该在任务完成后停止/销毁线程?此外,我们必须手动执行还是Spring会处理它(任务代码中的某些内容正在阻止它)