今天在看apache chainsaw这个项目的源代码时,无意中发现了一个非常简单的Job Scheduler的实现,源代码可以看这里:http://svn.apache.org/repos/asf/logging/chainsaw/trunk/src/main/java/org/apache/log4j/scheduler/ ,其中一个是Scheduler,另一个是Job接口。
Scheduler介绍道:
测试一下这个Scheduler,写一个非常简单的SimpleJob来实现Job接口。
package cn.lettoo.scheduler;
import java.text.SimpleDateFormat;
import java.util.Date;
public class SimpleJob implements Job {
private String name;
public SimpleJob(String name) {
this.name = name;
}
public void execute() {
Date now = new Date(System.currentTimeMillis());
System.out.println(String.format("%s: %s executed by thread %s",
SimpleDateFormat.getDateTimeInstance().format(now), this.name,
Thread.currentThread().getName()));
}
}
再写一个测试类:
package cn.lettoo.scheduler;
public class JobTest {
public static void main(String[] args) {
Scheduler scheduler = new Scheduler();
Job job1 = new SimpleJob("job1");
scheduler.schedule(job1, System.currentTimeMillis(), 5000);
scheduler.start();
}
}
*这里的scheduler.schedule(job1, System.currentTimeMillis(), 5000);表示立即运行,且每5秒运行一次。
执行结果如下:
这样一个简单的Job Scheduler就实现了,但我发现这样只是一个单线程的Job Scheduler,假如我每个Job运行时间是10秒,而间隔是5秒,同时有多个Job运行的话,这个Scheduler的效率还是很差的。
改动一下SimpleJob,让Job运行时sleep 10秒钟,来模拟job运行10秒。
public void execute() {
Date now = new Date(System.currentTimeMillis());
System.out.println(String.format("%s: %s executed by thread %s",
SimpleDateFormat.getDateTimeInstance().format(now), this.name,
Thread.currentThread().getName()));
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
同时,在JobTest中创建多个job,并且让Scheduler去执行:
public static void main(String[] args) {
Scheduler scheduler = new Scheduler();
Job job1 = new SimpleJob("job1");
scheduler.schedule(job1, System.currentTimeMillis(), 5000);
Job job2 = new SimpleJob("job2");
scheduler.schedule(job2, System.currentTimeMillis() + 1000, 5000);
Job job3 = new SimpleJob("job3");
scheduler.schedule(job3, System.currentTimeMillis() + 2000, 5000);
Job job4 = new SimpleJob("job4");
scheduler.schedule(job4, System.currentTimeMillis() + 3000, 5000);
Job job5 = new SimpleJob("job5");
scheduler.schedule(job5, System.currentTimeMillis() + 4000, 5000);
scheduler.start();
}
再运行:
可以看到,虽然我设置的job运行间隔都是5秒,但由于job本身要执行10秒,同时有多个job在排队执行,实现上job1的间隔已经到了50秒才执行。这样肯定是不行的。
那么,使用多线程应该就可以解决这个问题了,加入线程池。让每个job都由线程池里的一个线程去执行。
Scheduler源代码里,执行Job的方法是这样的:
/**
* Run scheduler.
*/
public synchronized void run() {
while (!shutdown) {
if (jobList.isEmpty()) {
linger();
} else {
ScheduledJobEntry sje = (ScheduledJobEntry) jobList.get(0);
long now = System.currentTimeMillis();
if (now >= sje.desiredExecutionTime) {
executeInABox(sje.job);
jobList.remove(0);
if (sje.period > 0) {
sje.desiredExecutionTime = now + sje.period;
schedule(sje);
}
} else {
linger(sje.desiredExecutionTime - now);
}
}
}
// clear out the job list to facilitate garbage collection
jobList.clear();
jobList = null;
System.out.println("Leaving scheduler run method");
}
/**
* We do not want a single failure to affect the whole scheduler.
* @param job job to execute.
*/
void executeInABox(final Job job) {
try {
job.execute();
} catch (Exception e) {
System.err.println("The execution of the job threw an exception");
e.printStackTrace(System.err);
}
}
可以看到,只要在executeInABox的方法里,使用线程池的线程来执行job,就可以了。现在加一个Scheduler的子类,我加上一个ExecutorService来实现线程池。同时我重写了executeInABox的方法,使用一个Runnable的实现类JobThread来运行job的execute方法。
package cn.lettoo.scheduler;
import java.util.concurrent.ExecutorService;
public class ThreadPoolScheduler extends Scheduler {
private ExecutorService pool;
public ThreadPoolScheduler(ExecutorService pool) {
super();
this.pool = pool;
}
@Override
void executeInABox(final Job job) {
pool.execute(new JobThread(job));
}
class JobThread implements Runnable {
private Job job;
public JobThread(Job job) {
this.job = job;
}
public void run() {
try {
this.job.execute();
} catch (Exception e) {
System.err.println("The execution of the job threw an exception");
e.printStackTrace(System.err);
}
}
}
}
再修改JobTest:
// 创建一个可缓存的线程池
ExecutorService pool = Executors.newCachedThreadPool();
// 构造带线程池的Scheduler
ThreadPoolScheduler scheduler = new ThreadPoolScheduler(pool);
.......
scheduler.start();
再运行,结果如下:
可以看到,这时,job已经按我的要求,每5秒运行一次了。
但再仔细一想,如果job是有状态的,我的job运行要10秒,而5秒就要再运行一次,有时我们是需要一个job完全执行完才能下一次再执行的,比如上面的job1,第一次运行完,才可以执行第二次。
怎么解决这个问题?我目前的做法是在ThreadPoolScheduler里增加一个Set,存储正在执行的Job,当Job执行完成后,从这个Set中删除。在下次执行的时候,判断是否在Set中,如果在,则不执行。
private Set<Job> runningJobList = new HashSet<Job>();
@Override
void executeInABox(final Job job) {
if (!runningJobList.contains(job)) {
runningJobList.add(job);
pool.execute(new JobThread(job));
}
}
class JobThread implements Runnable {
private Job job;
public JobThread(Job job) {
this.job = job;
}
public void run() {
try {
this.job.execute();
synchronized (this) {
runningJobList.remove(job);
}
} catch (Exception e) {
System.err
.println("The execution of the job threw an exception");
e.printStackTrace(System.err);
}
}
}
再执行:
可以看到,这里已经避免了job在执行的时候,再次被执行。当然,也发生了其他的问题,如job1,第一次执行在23:29:27,执行过程是10秒,那应该在23:29:37执行完,而我们要求是每5秒执行一次的话,则应该立即执行才对,可是实际上是在23:29:42才执行的。为什么会这样呢?原来,在Scheduler中的run()方法中,只要执行了executeInABox方法之后,都会在jobList.remove(0),也就是在job1被scheduler并且到了时间之后,即使没有被执行,但是也被从jobList里remove掉了,然后再重新加5秒再次scheduler上,也就是在23:29:37秒job1真正执行完成时,才再次重新scheduler上,也就是在42秒执行了。这是一个问题,如果要实现这个问题,需要重新对Scheduler的代码进行重构,即在run()方法加上对runningJobList的检查功能。我这里就没有实现,如果您有更好的方法,欢迎指出。