我有以下案例要模仿
程序首先查询每个接收参数的数据库并了解要运行的任务量。
定义了具有固定最大线程数的线程队列来执行任务。每个任务启动一个流程,该流程可能具有不同的配置,并且可能需要不同的时间。任务完成后,每个任务都有一个可配置的睡眠时间。
一旦任务Hibernate,它就不能阻塞执行队列中的某个位置。执行队列必须继续执行准备好执行的任务
我发现由于某种原因很难编码(主要是由于最后的要求)任何帮助都将不胜感激
谢谢
这是一段很长但很简单的代码,用于说明预定的重新提交,我还没有测试过:)
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.*;
interface Repeatable {
boolean shouldBeRepeated();
/**
* @return how long to sleep
*/
long delayBeforeRepeat();
/**
* @return "initial" state of this task instance, so this state can be resubmitted for repeated execution
*/
BusinessTask reset();
}
/**
* Whatever suits your business logic
*/
interface BusinessTask extends Callable<Repeatable> {
}
class BusinessTaskCompletionData {
final BusinessTask businessTask;
/**
* Timestamp when this task should be resubmitted
*/
final long nextSubmitTime;
BusinessTaskCompletionData(BusinessTask businessTask, long nextSubmitTime) {
this.businessTask = businessTask;
this.nextSubmitTime = nextSubmitTime;
}
}
class TaskResusltsConsumer implements Runnable {
private final CompletionService<Repeatable> completionService;
private final Deque<BusinessTaskCompletionData> completedTasks;
TaskResusltsConsumer(ExecutorService executor, Deque<BusinessTaskCompletionData> completedTasks) {
this.completedTasks = completedTasks;
completionService = new ExecutorCompletionService<>(executor);
}
@Override
public void run() {
while (true) {
try {
Future<Repeatable> completedBusinessTask = completionService.take();
Repeatable repeatable = completedBusinessTask.get();
if (repeatable.shouldBeRepeated()) {
completedTasks.add(new BusinessTaskCompletionData(repeatable.reset(),
System.currentTimeMillis() + repeatable.delayBeforeRepeat()));
}
} catch (ExecutionException | InterruptedException ie) {
// handle somehow
}
}
}
}
class TasksSupplier implements Runnable {
private final Deque<BusinessTaskCompletionData> completedTasks;
private final ExecutorService executor;
TasksSupplier(Deque<BusinessTaskCompletionData> completedTasks, ExecutorService executor) {
this.completedTasks = completedTasks;
this.executor = executor;
}
@Override
public void run() {
while (true) {
BusinessTask t = getTaskSomehow();
executor.submit(getTaskSomehow());
}
}
private BusinessTask getTaskSomehow() {
// implement
return null;
}
}
/**
* Actual implementation of logic to obtain 'initial state' of task to repeat and repeat schedule
*/
class BusinessData implements Repeatable {
// whatever
}
public class SOTest {
public static void main(String[] args) {
final LinkedList<BusinessTaskCompletionData> tasksToRepeat = new LinkedList<>();
// workers pool
final ExecutorService workersPool = Executors.newFixedThreadPool(10);
// controllers pool: 1 thread for supplier, the other for results consumer
final ExecutorService controllersPool = Executors.newFixedThreadPool(2);
controllersPool.submit(new TasksSupplier(tasksToRepeat, workersPool));
controllersPool.submit(new TaskResusltsConsumer(workersPool, tasksToRepeat));
// resubmitter scheduled pool
ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
scheduledExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
long now = System.currentTimeMillis();
Iterator<BusinessTaskCompletionData> it = tasksToRepeat.iterator();
while (it.hasNext()) {
BusinessTaskCompletionData data = it.next();
if (data.nextSubmitTime >= now) {
workersPool.submit(data.businessTask);
it.remove();
}
}
}
},
// initial delay of 1 sec
1000,
// periodic delay of 1 sec
1000,
TimeUnit.MILLISECONDS
);
}
}
我创建了一个带有Spout的Storm拓扑,该Spout会发出许多元组用于基准测试。一旦所有的元组都从spout发出或者拓扑中不再有任何元组流动,我就想停止/终止我的拓扑。
问题内容: 我是python和线程的新手。我已经编写了充当网络爬虫的python代码,并在网站中搜索特定的关键字。我的问题是,如何使用线程同时运行类的三个不同实例。当实例之一找到关键字时,所有三个实例都必须关闭并停止爬网。这是一些代码。 如何使用线程让Crawler同时执行三个不同的爬网? 问题答案: 似乎没有一种(简单的)方法可以终止Python中的线程。 这是一个并行运行多个HTTP请求的简单
我试过这样的事情: create(new observable.onsubscribe>(){@override public void call(subscriber>subscriber){subscriber.onnext(getList());subscriber.oncompleted();}}).subscribe... 但是这个订阅在一个迭代上,onNext只传递完整的列表,而不是列
我需要一个库或我们的软件工具,可以: 1)将线程/作业/任务(任何东西--如果需要,我们可以重写代码,我们在mintue有线程对象)放入像system这样的队列中2)我们可以定义同时最多运行多少线程3)线程完成后,线程从队列中移除,这样GC就可以移除所有涉及的实体。 我正在进行大量阅读,发现ExecutorService(Executors.newFixedThreadPool(5);)但问题可能
我想了解在fork-连接池中处理任务Java顺序。 到目前为止,我在文档中找到的唯一相关信息是关于一个名为“asyncMode”的参数,该参数“如果此池对从未加入的分叉任务使用本地先进先出调度模式,则为真”。 我对这句话的解释是,每个工人都有自己的任务队列;工人从他们自己队列的前面接受任务,或者如果他们自己的队列是空的,从其他工人队列的后面偷走任务;如果asyncMode为真(分别为假),工作人员
问题内容: 我要实现的混合,并且一旦所有要素都成功完成,返回的未来就成功完成,或者一旦任何要素都异常完成,它就异常地完成(相同的例外)。如果多个元素失败,则返回其中任何一个的异常就足够了。 用例 我有一个任务需要汇总由s 列表返回的子结果,但是一旦其中任何一个失败,该任务就应该停止等待。我知道子任务将继续运行,这没关系。 相关问题 我发现在“ 等待”列表上等待,最初看起来像是一个重复的问题,但被接
本文向大家介绍Python实现简单多线程任务队列,包括了Python实现简单多线程任务队列的使用技巧和注意事项,需要的朋友参考一下 最近我在用梯度下降算法绘制神经网络的数据时,遇到了一些算法性能的问题。梯度下降算法的代码如下(伪代码): 一般来说,当网络请求 plot.ly 绘图时会阻塞等待返回,于是也会影响到其他的梯度下降函数的执行速度。 一种解决办法是每调用一次 plotly.write 函数
我们在spring boot应用程序中实现了一个计划任务,用于从MQ读取消息。在IntelliJ中调试时,我看到任务的线程被启动,然后在任务完成后进入等待状态。 这是正常的还是应该在任务完成后停止/销毁线程?此外,我们必须手动执行还是Spring会处理它(任务代码中的某些内容正在阻止它)