JobRunner.Class
@Scheduled
public void pipelineRunner() {
List<Job> jobs = getJobs(); //fetches tasks from database
for(Job job: jobs){
try{
heartBeatSender.startHeartBeat(job); // updates last_update_time column for the current job in database on background thread
runJob(job); // contains logic of processing a job, how to abort current job it is processing if heartbeat sender throws a runtime exception or how to notify or make scheduler thread throw exception so that it can move to next job in the list?
}catch(Exception e){}
finally{
heartBeatSender.stopHeartBeat(); //stops updating last_update_time col.
}
}
}
heartbeatsender.class
public void startHeartBeat(Job job) {
Runnable heartBeat = () -> {
if (job != null) {
job.setLastUpdateTime(System.currentTimeMillis());
jobRepository.save(job);//runtime exception can be thrown here.
}
}
executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleAtFixedRate(heartBeat, 0L, heartBeatInterval, TimeUnit.MILLISECONDS); //heartBeatInterval every 5mins
}
您发布的代码有点不一致,因为heartbeatsender
似乎可以向多个作业发送心跳,但客户端代码总是只触发一个。但是,如果您有另一个类也使用这个heartbeatsender
,您将已经安排了两个heartbeat运行表,那么它如何决定将取消哪个stopheartbeat()
?更不用说,我一开始就无法知道它是如何取消它的,因为它不跟踪startheartbeat()
调用之外的计划任务。
所以您可能想要做的是让heartbeatsender
按作业跟踪它的心跳,这样它就可以单独取消它们。
class HeartbeatSender {
private final Map<Job, ScheduledFuture<?>> heartbeats = new HashMap<>();
// you can create a new executor in start, but I don't see why
private final ScheduledExecutorService heartbeatExecutor = newSingleThreadScheduledExecutor();
public void startHeartbeat(Job job) {
Runnable heartBeat= ...;
ScheduledFuture<?> schedule = executor.scheduleAtFixedRate(heartBeat, 0L, heartBeatInterval, TimeUnit.MILLISECONDS);
heartbeats.put(job, schedule); // remember so you can cancel later
}
public void stopHeartbeat(Job job) {
ScheduledFuture<?> schedule = heartbeats.remove(job);
if (schedule != null) {
schedule.cancel(false);
}
}
}
现在,您可以按作业停止心跳,所以您只需要在客户端代码中这样做。
for(Job job: jobs){
try {
heartBeatSender.startHeartBeat(job);
runJob(job);
} catch(Exception e) {
} finally{
heartBeatSender.stopHeartBeat(job); // stops heartbeat for this job
}
}
我目前有以下示例代码,我试图将OutputStream转换为InputStream,这是我从http://blog.ostermiller.org/convert-java-outputStream-InputStream中的获得的想法 但这里我的问题是,save方法可能会抛出IOException,我希望捕获它,并将其作为getInputStream方法的一部分重新抛出。 我读过如何从线程捕捉异
我应该实现一个原始的多线程服务器,为每个连接启动一个新线程。 其中一个线程可能会收到关闭服务器的消息。 是否可以从其中一个子线程通知父线程停止接受新连接并关闭服务器?
我正在为以下Java代码编写单元测试: 我试图模拟myService抛出错误的过程。myMethod(),以便在上面代码的catch块中提取它,并抛出MyException实例。 在我的JUnit测试中(使用org.springframework.test.context.junit4. SpringJUnit4ClassRunner和Mockito),我嘲笑了MyService并将其作为Spri
任何帮助都将不胜感激。
我有一系列的生产者和消费者线程。在消费者线程中,我有锁。wait()函数,如果队列中没有数据,则停止执行。在生产者线程中生成数据时,锁定。调用notify(),使用者线程脱离等待状态并移动到线程调度程序以获取锁。当notifier生产者线程释放锁时,线程调度程序会选择一个随机线程进行处理(它也可以是另一个生产者线程,但我希望它给这个等待的线程加锁)。我的问题是,是否有一种方法可以让这些等待和通知的
问题内容: 根据我一直在阅读的定义: 线程基本上是并发(同时)运行的代码段 。 但是,如何在存在线程调度程序的情况下同时运行它们? 我读到,线程调度程序基本上是从线程池中随机选择一个线程在某个时刻运行。从中我得到一个确切的时间点,只有一个可运行线程真正处于运行状态(运行)。( 所有这些均来自SCJP Sun认证程序员学习指南 )有人可以澄清吗? 这些线程是否真正同时运行? 问题答案: 但是,如何在