最近在项目使用saturn过程中,遇到了一些原框架解决不了的问题,好在这个框架是开源的,通过对源码做了分析,找到了解决办法,有点啰嗦了,来点直接的。
首先,saturn使用的版本是3.3.1,源码自行下载:https://github.com/vipshop/Saturn/releases
问题如下:
序号 | 课题 | 调研结果 | 方案 | 实施结果 |
1 | saturn executor实现失败转移分片立即执行 | 1.失败分片执行过程:监听失败分片,在zookeeper上做标记,当executor上本作业执行完之后,进行失败分片转移; | 监听到失败分片,在进行失败转移后,添加如下操作,即利用分布式锁异步执行失败分片任务 | 1.失败分片执行过程:监听失败分片,在zookeeper上做标记,当executor上本作业执行完之后,进行失败分片转移; |
2 | 单个executor上出现多个失败转移分片,怎么并行执行 | 把失效转移的限制(必须当前executor上预先分片执行完才可以执行失败转移)去除即可 | 把失效转移的限制(必须当前executor上预先分片执行完才可以执行失败转移)去除即可 | 把失效转移的限制(必须当前executor上预先分片执行完才可以执行失败转移)去除即可 |
3 | 对于长时间执行的作业,通过saturn console点击立即终止停作业之后,怎么优雅的停掉executor上该作业的任务 | 1.(需调研)saturn console上点击立即终止(分析其内部逻辑),在zookeeper上生成一个终止该作业的标识; | console界面点击“禁用”=>“立即终止”,在此之前在应用端实现postForceStop,实现业务端资源终止,executor在终止完线程之后会执行postForceStop |
|
4 | 对于saturn作业出现阻塞,即假死的情况如何处理 |
|
|
|
目录
二、单个executor上出现多个失败转移分片,怎么并行执行
三、对于长时间执行的作业,通过saturn console点击立即终止停作业之后,怎么优雅的停掉executor上该作业的任务
为了解决此问题,我们必须先分析清除,executor究竟是怎么执行作业的,以及失败的分片又是怎么运行的,问题的源头首先从配置作业开始。
配置作业代码如下,
private void addOrCopyJob(String namespace, JobConfig jobConfig, String jobNameCopied, String createdBy)
throws SaturnJobConsoleException {
List<JobConfig> unSystemJobs = getUnSystemJobs(namespace);
Set<JobConfig> streamChangedJobs = new HashSet<>();
validateJobConfig(namespace, jobConfig, unSystemJobs, streamChangedJobs);
// 如果数据存在相同作业名,则抛异常
// 直接再查一次,不使用unSystemJobs,因为也不能与系统作业名相同
String jobName = jobConfig.getJobName();
if (currentJobConfigService.findConfigByNamespaceAndJobName(namespace, jobName) != null) {
throw new SaturnJobConsoleException(ERROR_CODE_BAD_REQUEST, String.format("该作业(%s)已经存在", jobName));
}
// 如果zk存在该作业,则尝试删除
CuratorRepository.CuratorFrameworkOp curatorFrameworkOp = registryCenterService
.getCuratorFrameworkOp(namespace);
if (curatorFrameworkOp.checkExists(JobNodePath.getJobNodePath(jobName))) {
if (!removeJobFromZk(jobName, curatorFrameworkOp)) {
throw new SaturnJobConsoleException(ERROR_CODE_BAD_REQUEST,
String.format("该作业(%s)正在删除中,请稍后再试", jobName));
}
}
// 该域作业总数不能超过一定数量
int maxJobNum = getMaxJobNum();
if (jobIncExceeds(namespace, maxJobNum, 1)) {
throw new SaturnJobConsoleException(ERROR_CODE_BAD_REQUEST,
String.format("总作业数超过最大限制(%d),作业名%s创建失败", maxJobNum, jobName));
}
// 如果是copy作业,则从数据库中复制被拷贝的作业的配置到新的作业配置
JobConfig myJobConfig = jobConfig;
if (jobNameCopied != null) {
myJobConfig = currentJobConfigService.findConfigByNamespaceAndJobName(namespace, jobNameCopied);
SaturnBeanUtils.copyPropertiesIgnoreNull(jobConfig, myJobConfig);
}
// 设置作业配置字段默认值,并且强制纠正某些字段
correctConfigValueWhenAddJob(myJobConfig);
// 添加该作业到数据库
currentJobConfigService.create(constructJobConfig4DB(namespace, myJobConfig, createdBy, createdBy));
// 更新关联作业的上下游
for (JobConfig streamChangedJob : streamChangedJobs) {
currentJobConfigService.updateStream(constructJobConfig4DB(namespace, streamChangedJob, null, createdBy));
}
// 添加该作业配置到zk,并联动更新关联作业的上下游
createJobConfigToZk(myJobConfig, streamChangedJobs, curatorFrameworkOp);
}
最后一行的createJobConfigToZk里面会在zookeeper上添加节点$jobs/jobName/config。
executor启动时,执行类com/vip/saturn/job/executor/SaturnExecutor.java,它主要做了三件事:第一,监听$jobs节点下的变化,即监听所有作业的变化;第二,初始化作业调度器,开启监听本作业所有节点状态的变化(即com/vip/saturn/job/internal/listener/ListenerManager.java);第三,创建并执行本作业。当然,SaturnExecutor.java这个类还做了很多其他的事,有兴趣的同学,可以自行了解下。
执行作业的开始部分,见com/vip/saturn/job/trigger/SaturnWorker.java,如下
public void run() {
while (!halted.get()) {
try {
synchronized (sigLock) {
while (paused && !halted.get()) {
try {
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
}
if (halted.get()) {
break;
}
}
boolean noFireTime = false; // 没有下次执行时间,初始化为false
long timeUntilTrigger = 1000;
if (triggerObj != null) {
triggerObj.updateAfterMisfire(null);
long now = System.currentTimeMillis();
Date nextFireTime = triggerObj.getNextFireTime();
if (nextFireTime != null) {
timeUntilTrigger = nextFireTime.getTime() - now;
} else {
noFireTime = true;
}
}
while (!noFireTime && timeUntilTrigger > 2) {
synchronized (sigLock) {
if (halted.get()) {
break;
}
if (triggered.isYes()) {
break;
}
try {
sigLock.wait(timeUntilTrigger);
} catch (InterruptedException ignore) {
}
if (triggerObj != null) {
long now = System.currentTimeMillis();
Date nextFireTime = triggerObj.getNextFireTime();
if (nextFireTime != null) {
timeUntilTrigger = nextFireTime.getTime() - now;
} else {
noFireTime = true;
}
}
}
}
boolean goAhead;
Triggered currentTriggered = notTriggered;
// 触发执行只有两个条件:1.时间到了 2.点立即执行
synchronized (sigLock) {
goAhead = !halted.get() && !paused;
// 重置立即执行标志,赋值当前立即执行数据
if (triggered.isYes()) {
currentTriggered = triggered;
triggered = notTriggered;
} else if (goAhead) { // 非立即执行。即,执行时间到了,或者没有下次执行时间
goAhead = goAhead && !noFireTime; // 有下次执行时间,即执行时间到了,才执行作业
if (goAhead) { // 执行时间到了,更新执行时间
if (triggerObj != null) {
triggerObj.triggered(null);
}
} else { // 没有下次执行时间,则尝试睡一秒,防止不停的循环导致CPU使用率过高(如果cron不再改为周期性执行)
try {
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
}
}
}
if (goAhead) {
job.execute(currentTriggered);
}
} catch (RuntimeException e) {
LogUtils.error(log, job.getJobName(), e.getMessage(), e);
}
}
}
这里总的意思是,对于定时作业,则到时间再执行,其他时间都是在等待中。如果是立即执行则跳过等待,立即执行。在作业开始执行时,第一步,先处理完刚开始分配到该executor上的分片,且是多线程执行,执行完之后更新分片的状态;第二步,执行下游作业;第三步,使用zk分布式锁实现争抢失败的分片,并把失败的分片标记为立即执行(注意,此时不会立即执行失败的分片,这里只是标记下而已)。至此,一个循环结束,开始下个循环,这是才开始执行失败的分片,因为每次使用zk分布式锁执行争抢失败的分片时,都是取一个失败的分片,所以每个循环只执行一个失败的分片。
多线程执行分片的代码如下,
protected Map<Integer, SaturnJobReturn> handleJob(final SaturnExecutionContext shardingContext) {
final Map<Integer, SaturnJobReturn> retMap = new HashMap<Integer, SaturnJobReturn>();
synchronized (futureTaskMap) {
futureTaskMap.clear();
final String jobName = shardingContext.getJobName();
final int timeoutSeconds = getTimeoutSeconds();
ExecutorService executorService = getExecutorService();
// 处理自定义参数
String jobParameter = shardingContext.getJobParameter();
// shardingItemParameters为参数表解析出来的Key/Value值
Map<Integer, String> shardingItemParameters = shardingContext.getShardingItemParameters();
for (final Entry<Integer, String> shardingItem : shardingItemParameters.entrySet()) {
final Integer key = shardingItem.getKey();
try {
String jobValue = shardingItem.getValue();
final String itemVal = getRealItemValue(jobParameter, jobValue); // 作业分片的对应值
ShardingItemFutureTask shardingItemFutureTask = new ShardingItemFutureTask(
createCallable(jobName, key, itemVal, timeoutSeconds, shardingContext, this), null);
Future<?> callFuture = executorService.submit(shardingItemFutureTask);
if (timeoutSeconds > 0) {
TimeoutSchedulerExecutor.scheduleTimeoutJob(shardingContext.getExecutorName(), timeoutSeconds,
shardingItemFutureTask);
}
shardingItemFutureTask.setCallFuture(callFuture);
futureTaskMap.put(key, shardingItemFutureTask);
} catch (Throwable t) {
LogUtils.error(log, jobName, t.getMessage(), t);
retMap.put(key, new SaturnJobReturn(SaturnSystemReturnCode.SYSTEM_FAIL, t.getMessage(),
SaturnSystemErrorGroup.FAIL));
}
}
}
执行下游作业代码如下,
private void runDownStream(final JobExecutionMultipleShardingContext shardingContext) {
if (configService.isLocalMode()) {
return;
}
JobType jobType = configService.getJobType();
if (!(jobType.isCron() || jobType.isPassive())) {
return;
}
if (shardingContext.getShardingTotalCount() != 1) {
return;
}
List<String> downStream = configService.getDownStream();
if (downStream.isEmpty()) {
return;
}
if (!mayRunDownStream(shardingContext)) {
return;
}
String downStreamDataStr = scheduler.getTrigger().serializeDownStreamData(shardingContext.getTriggered());
String logMessagePrefix = "call runDownStream api";
int size = SystemEnvProperties.VIP_SATURN_CONSOLE_URI_LIST.size();
for (int i = 0; i < size; i++) {
String consoleUri = SystemEnvProperties.VIP_SATURN_CONSOLE_URI_LIST.get(i);
String targetUrl = consoleUri + "/rest/v1/" + namespace + "/jobs/" + jobName + "/runDownStream";
LogUtils.info(log, jobName, "{}, target url is {}", logMessagePrefix, targetUrl);
CloseableHttpClient httpClient = null;
try {
httpClient = HttpClientBuilder.create().build();
HttpPost request = new HttpPost(targetUrl);
final RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(5000)
.setSocketTimeout(10000).build();
request.setConfig(requestConfig);
request.addHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString());
request.setEntity(new StringEntity(downStreamDataStr));
CloseableHttpResponse httpResponse = httpClient.execute(request);
StatusLine statusLine = httpResponse.getStatusLine();
if (statusLine != null && statusLine.getStatusCode() == HttpStatus.SC_OK) {
HttpEntity entity = httpResponse.getEntity();
String result = entity != null ? EntityUtils.toString(entity, "UTF-8") : null;
LogUtils.info(log, jobName, "{}, result is {}", logMessagePrefix, result);
return;
} else {
LogUtils.info(log, jobName, "{} failed, StatusLine is {}", logMessagePrefix, statusLine);
}
} catch (Exception e) {
LogUtils.error(log, jobName, "{} error", logMessagePrefix, e);
} finally {
HttpClientUtils.closeQuietly(httpClient);
}
}
}
使用zk分布式锁执行争抢失败的分片,代码如下,
public void failoverIfNecessary() {
if (!needFailover()) {
return;
}
getJobNodeStorage()
.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback(), 1, TimeUnit.MINUTES,
new FailoverTimeoutLeaderExecutionCallback());
}
class FailoverLeaderExecutionCallback implements LeaderExecutionCallback {
@Override
public void execute() {
if (!needFailover()) {
return;
}
if (jobScheduler == null) {
return;
}
if (coordinatorRegistryCenter.isExisted(SaturnExecutorsNode.getExecutorNoTrafficNodePath(executorName))) {
return;
}
if (!jobScheduler.getConfigService().getPreferList().contains(executorName) && !jobScheduler
.getConfigService().isUseDispreferList()) {
return;
}
List<String> items = getJobNodeStorage().getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT);
if (items != null && !items.isEmpty()) {
int crashedItem = Integer
.parseInt(getJobNodeStorage().getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
LogUtils.debug(log, jobName, "Elastic job: failover job begin, crashed item:{}.", crashedItem);
getJobNodeStorage()
.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), executorName);
getJobNodeStorage().removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));
jobScheduler.triggerJob(null);
}
}
}
总结下,失败的分片不会立即执行,而是等当前executor上,单批次分配的正常分片执行完之后,才开始执行失败的分片,而且多个失败分片按顺序依次执行。
类com/vip/saturn/job/internal/failover/FailoverListenerManager.java对运行的节点和失败的节点进行监听,当运行的节点挂掉之后,首先做标记
public void createCrashedFailoverFlag(final int item) {
if (!isFailoverAssigned(item)) {
try {
getJobNodeStorage().getClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
.forPath(JobNodePath.getNodeFullPath(jobName, FailoverNode.getItemsNode(item)));
LogUtils.info(log, jobName, "{} - {} create failover flag of item {}", executorName, jobName, item);
} catch (KeeperException.NodeExistsException e) { // NOSONAR
LogUtils.debug(log, jobName,
"{} - {} create failover flag of item {} failed, because it is already existing", executorName,
jobName, item);
} catch (Exception e) {
LogUtils.error(log, jobName, e.getMessage(), e);
}
}
}
当当前executor上本作业没有在执行的的节点时,执行时效转移,也可能压根用不到
public void failoverIfNecessary() {
if (!needFailover()) {
return;
}
getJobNodeStorage()
.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback(), 1, TimeUnit.MINUTES,
new FailoverTimeoutLeaderExecutionCallback());
}
结合2.2和2.3,我们可以知道,失败的节点无法立即执行,特别对于一次执行且执行时间较长的作业,尤其明显。
总的方案,就是监听到失败分片,在进行失败转移后,添加如下操作,即利用同步锁异步执行失败分片任务。代码如下
com/vip/saturn/job/internal/failover/FailoverListenerManager.java,修改如下代码
private synchronized void failover(final Integer item) {
if (jobScheduler == null || jobScheduler.getJob() == null) {
return;
}
if (!jobScheduler.getJob().isFailoverSupported() || !configService.isFailover() || executionService
.isCompleted(item)) {
return;
}
failoverService.createCrashedFailoverFlag(item);
// if (!executionService.hasRunningItems(jobScheduler.getShardingService().getLocalHostShardingItems())) {
// failoverService.failoverIfNecessary();
// }
//失败的节点立即执行
failoverService.failoverAndExecute();
}
com/vip/saturn/job/internal/failover/FailoverService.java,添加如下代码
/**
* 如果需要失效转移, 则设置作业失效转移且执行失败分片.
*/
public void failoverAndExecute() {
if (!needFailover()) {
return;
}
getJobNodeStorage()
.executeInLeader(FailoverNode.LATCH, new DirectFailoverLeaderExecutionCallback(), 1, TimeUnit.MINUTES,
new FailoverTimeoutLeaderExecutionCallback());
}
class DirectFailoverLeaderExecutionCallback implements LeaderExecutionCallback {
@Override
public void execute() {
if (!needFailover()) {
return;
}
if (jobScheduler == null) {
return;
}
if (coordinatorRegistryCenter.isExisted(SaturnExecutorsNode.getExecutorNoTrafficNodePath(executorName))) {
return;
}
if (!jobScheduler.getConfigService().getPreferList().contains(executorName) && !jobScheduler
.getConfigService().isUseDispreferList()) {
return;
}
List<String> items = getJobNodeStorage().getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT);
if (items != null && !items.isEmpty()) {
int crashedItem = Integer
.parseInt(getJobNodeStorage().getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
LogUtils.debug(log, jobName, "Elastic job: failover job begin, crashed item:{}.", crashedItem);
getJobNodeStorage()
.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), executorName);
getJobNodeStorage().removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));
Triggered currentTriggered = new Triggered();
currentTriggered.setYes(true);
currentTriggered.setUpStreamData(null);
currentTriggered.setDownStreamData(null);
//执行失败分片
ExecutorService pool = Executors.newSingleThreadExecutor();
try{
pool.submit(new ExecuteJobItemRunner(jobScheduler.getJob(), currentTriggered));
} catch (Exception e){
} finally {
pool.shutdown();
}
}
}
}
/**
* 单线程异步执行失效转移任务
*/
public class ExecuteJobItemRunner implements Runnable {
private AbstractElasticJob job;
private Triggered currentTriggered;
public ExecuteJobItemRunner(AbstractElasticJob job, Triggered currentTriggered) {
this.job = job;
this.currentTriggered = currentTriggered;
}
@Override
public void run() {
try {
job.execute(currentTriggered);
} catch (Exception e) {
LogUtils.error(log, jobName, "Failover ExecuteJobItemRunner Exception:{}", e);
}
}
}
至此,大功告成了吧,经过测试,确实可行。但是,当执行“立即终止”作业的时候,问题又来了,对于失败转移分片情况下,线程执行分片结果不对,且立即终止不能杀死所有线程问题。
分析下执行作业这块的源码,如下(com.vip.saturn.job.java.SaturnJavaJob):
@Override
protected Map<Integer, SaturnJobReturn> handleJob(final SaturnExecutionContext shardingContext) {
final Map<Integer, SaturnJobReturn> retMap = new HashMap<Integer, SaturnJobReturn>();
synchronized (futureTaskMap) {
futureTaskMap.clear();
final String jobName = shardingContext.getJobName();
final int timeoutSeconds = getTimeoutSeconds();
ExecutorService executorService = getExecutorService();
// 处理自定义参数
String jobParameter = shardingContext.getJobParameter();
// shardingItemParameters为参数表解析出来的Key/Value值
Map<Integer, String> shardingItemParameters = shardingContext.getShardingItemParameters();
for (final Entry<Integer, String> shardingItem : shardingItemParameters.entrySet()) {
final Integer key = shardingItem.getKey();
try {
String jobValue = shardingItem.getValue();
final String itemVal = getRealItemValue(jobParameter, jobValue); // 作业分片的对应值
ShardingItemFutureTask shardingItemFutureTask = new ShardingItemFutureTask(
createCallable(jobName, key, itemVal, timeoutSeconds, shardingContext, this), null);
Future<?> callFuture = executorService.submit(shardingItemFutureTask);
if (timeoutSeconds > 0) {
TimeoutSchedulerExecutor.scheduleTimeoutJob(shardingContext.getExecutorName(), timeoutSeconds,
shardingItemFutureTask);
}
shardingItemFutureTask.setCallFuture(callFuture);
futureTaskMap.put(key, shardingItemFutureTask);
} catch (Throwable t) {
LogUtils.error(log, jobName, t.getMessage(), t);
retMap.put(key, new SaturnJobReturn(SaturnSystemReturnCode.SYSTEM_FAIL, t.getMessage(),
SaturnSystemErrorGroup.FAIL));
}
}
}
for (Entry<Integer, ShardingItemFutureTask> entry : futureTaskMap.entrySet()) {
Integer item = entry.getKey();
ShardingItemFutureTask futureTask = entry.getValue();
try {
futureTask.getCallFuture().get();
} catch (Exception e) {
LogUtils.error(log, jobName, e.getMessage(), e);
retMap.put(item, new SaturnJobReturn(SaturnSystemReturnCode.SYSTEM_FAIL, e.getMessage(),
SaturnSystemErrorGroup.FAIL));
continue;
}
retMap.put(item, futureTask.getCallable().getSaturnJobReturn());
}
synchronized (futureTaskMap) {
futureTaskMap.clear();
}
return retMap;
}
@Override
public void forceStop() {
super.forceStop();
synchronized (futureTaskMap) {
for (ShardingItemFutureTask shardingItemFutureTask : futureTaskMap.values()) {
JavaShardingItemCallable shardingItemCallable = shardingItemFutureTask.getCallable();
Thread currentThread = shardingItemCallable.getCurrentThread();
if (currentThread != null) {
try {
if (shardingItemCallable.forceStop()) {
LogUtils.info(log, jobName, "Force stop job, jobName:{}, item:{}", jobName,
shardingItemCallable.getItem());
shardingItemCallable.beforeForceStop();
ShardingItemFutureTask.killRunningBusinessThread(shardingItemFutureTask);
}
} catch (Throwable t) {
LogUtils.error(log, jobName, t.getMessage(), t);
}
}
}
}
}
每当执行失败分片任务时,问题一,futureTaskMap.clear()都会把futureTaskMap清空一次,这样会影响立即终止时,有些线程杀不到。问题二,当futureTaskMap有新增或者删除的时候,可能会有对futureTaskMap遍历的操作,这样会造成并发冲突java.util.ConcurrentModificationException,所以为了解决这两个问题,做了如下修改。
//记录正在运行的分片,用于杀死线程使用,如果是立即终止操作之后新建的线程,可能杀不到
private Set<Integer> currentRunningItemSets = new HashSet<>();
@Override
protected Map<Integer, SaturnJobReturn> handleJob(final SaturnExecutionContext shardingContext) {
final Map<Integer, SaturnJobReturn> retMap = new HashMap<Integer, SaturnJobReturn>();
//记录本次执行的分片序号,用于futureTaskMap删除分片线程数据使用
Set<Integer> itemSets = new HashSet<>();
synchronized (futureTaskMap) {
// futureTaskMap.clear();
final String jobName = shardingContext.getJobName();
final int timeoutSeconds = getTimeoutSeconds();
ExecutorService executorService = getExecutorService();
// 处理自定义参数
String jobParameter = shardingContext.getJobParameter();
// shardingItemParameters为参数表解析出来的Key/Value值
Map<Integer, String> shardingItemParameters = shardingContext.getShardingItemParameters();
for (final Entry<Integer, String> shardingItem : shardingItemParameters.entrySet()) {
final Integer key = shardingItem.getKey();
try {
String jobValue = shardingItem.getValue();
final String itemVal = getRealItemValue(jobParameter, jobValue); // 作业分片的对应值
ShardingItemFutureTask shardingItemFutureTask = new ShardingItemFutureTask(
createCallable(jobName, key, itemVal, timeoutSeconds, shardingContext, this), null);
Future<?> callFuture = executorService.submit(shardingItemFutureTask);
if (timeoutSeconds > 0) {
TimeoutSchedulerExecutor.scheduleTimeoutJob(shardingContext.getExecutorName(), timeoutSeconds,
shardingItemFutureTask);
}
shardingItemFutureTask.setCallFuture(callFuture);
futureTaskMap.put(key, shardingItemFutureTask);
itemSets.add(key);
currentRunningItemSets.add(key);
LogUtils.info(log, jobName,"zy futureTaskMap insert put {}",key);
} catch (Throwable t) {
LogUtils.error(log, jobName, t.getMessage(), t);
retMap.put(key, new SaturnJobReturn(SaturnSystemReturnCode.SYSTEM_FAIL, t.getMessage(),
SaturnSystemErrorGroup.FAIL));
}
}
}
for (Integer item:itemSets){
ShardingItemFutureTask futureTask = futureTaskMap.get(item);
try {
futureTask.getCallFuture().get();
} catch (Exception e) {
LogUtils.error(log, jobName, e.getMessage(), e);
retMap.put(item, new SaturnJobReturn(SaturnSystemReturnCode.SYSTEM_FAIL, e.getMessage(),
SaturnSystemErrorGroup.FAIL));
continue;
}
retMap.put(item, futureTask.getCallable().getSaturnJobReturn());
LogUtils.info(log, jobName,"zy futureTaskMap insert get {}",item);
futureTaskMap.remove(item);
LogUtils.info(log, jobName,"zy futureTaskMap remove item {}", item);
}
synchronized (futureTaskMap) {
currentRunningItemSets.removeAll(itemSets);
}
return retMap;
}
@Override
public void forceStop() {
super.forceStop();
synchronized (futureTaskMap) {
for(Integer item:currentRunningItemSets){
ShardingItemFutureTask shardingItemFutureTask = futureTaskMap.get(item);
if(shardingItemFutureTask == null){
continue;
}
JavaShardingItemCallable shardingItemCallable = shardingItemFutureTask.getCallable();
Thread currentThread = shardingItemCallable.getCurrentThread();
if (currentThread != null) {
try {
if (shardingItemCallable.forceStop()) {
LogUtils.info(log, jobName, "Force stop job, jobName:{}, item:{}", jobName,
shardingItemCallable.getItem());
shardingItemCallable.beforeForceStop();
ShardingItemFutureTask.killRunningBusinessThread(shardingItemFutureTask);
}
} catch (Throwable t) {
LogUtils.error(log, jobName, t.getMessage(), t);
}
}
}
}
}
经测试,完全ok,至此真正完结。
在第一点里,已经解决了此问题,即让失败的分片,在失败之后,立即做标记,并立即争抢这部分失败分片资源即可。而不是采用原先的循环处理。
为了解决这个问题(此处不讨论socket.read()这种不可中断情况,这种情况后期再补),我们先了解executor实际执行执行作业的逻辑是怎样的,注意此处是对上面executor执行作业的深入研究,不是重复哦。
saturn作业任务执行过程是,先分配分片到executor上,多线程执行单个executor上的多个分片任务,分片数与线程数是1:1,执行完任务之后,线程自动销毁,并更新任务的状态,至此任务执行结束。
如果在执行任务的过程中点击立即终止,则启一个线程杀死正在执行任务的线程,终止掉之后,执行应用端的postForceStop,然后更新任务的状态,至此任务结束。
应用端的kafka、es的client等相关资源的释放,可以放在业务端的postForceStop中实现。
相关代码如下,
@Override
public SaturnJobReturn call() throws Exception {
Thread.currentThread().setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
if (e instanceof IllegalMonitorStateException || e instanceof ThreadDeath) {
LogUtils.warn(log, callable.getJobName(), "business thread pool maybe crashed", e);
if (callFuture != null) {
callFuture.cancel(false);
}
LogUtils.warn(log, callable.getJobName(),
"close the old business thread pool, and re-create new one");
callable.getSaturnJob().getJobScheduler().reCreateExecutorService();
}
}
});
try {
SaturnJobReturn ret = callable.call();
return ret;
} finally {
done();
LogUtils.debug(log, callable.getJobName(), "job:[{}] item:[{}] finish execution, which takes {}ms",
callable.getJobName(), callable.getItem(), callable.getExecutionTime());
}
}
/**
* 真正执行作业分片逻辑
*
* @return 执行结果
*/
public SaturnJobReturn call() {
reset();
SaturnSystemOutputStream.initLogger();
currentThread = Thread.currentThread();
SaturnJobReturn temp = null;
try {
beforeExecution();
temp = doExecution();
// 在此之后,不能再强制停止本线程
breakForceStop = true;
LogUtils.info(log, jobName, "zy call over");
} catch (Throwable t) {
// 在此之后,不能再强制停止本线程
breakForceStop = true;
LogUtils.info(log, jobName, "zy call Throwable over");
// 不是超时,不是强制停止。 打印错误日志,设置SaturnJobReturn。
if (status.get() != TIMEOUT && status.get() != FORCE_STOP) {
LogUtils.error(log, jobName, t.toString(), t);
temp = new SaturnJobReturn(SaturnSystemReturnCode.SYSTEM_FAIL, t.getMessage(),
SaturnSystemErrorGroup.FAIL);
}
} finally {
if (status.compareAndSet(INIT, SUCCESS)) {
saturnJobReturn = temp;
}
if (saturnJob != null && saturnJob.getConfigService().showNormalLog()) {
String jobLog = SaturnSystemOutputStream.clearAndGetLog();
if (jobLog != null && jobLog.length() > SaturnConstant.MAX_JOB_LOG_DATA_LENGTH) {
LogUtils.info(log, jobName,
"As the job log exceed max length, only the previous {} characters will be reported",
SaturnConstant.MAX_JOB_LOG_DATA_LENGTH);
jobLog = jobLog.substring(0, SaturnConstant.MAX_JOB_LOG_DATA_LENGTH);
}
this.shardingContext.putJobLog(this.item, jobLog);
}
}
return saturnJobReturn;
}
private void done() {
if (timeoutFuture != null) {
timeoutFuture.cancel(true);
timeoutFuture = null;
}
if (done) {
return;
}
done = true;
LogUtils.info(log, callable.getJobName(), "zy done true");
try {
try {
if (callable.isTimeout()) {
//此处执行业务端的beforeTimeout,onTimeout程序
callable.onTimeout();
}
} catch (Throwable t) {
LogUtils.error(log, callable.getJobName(), t.toString(), t);
}
try {
if (callable.isForceStop()) {
//此处执行业务端的beforeForceStop,postForceStop程序
callable.postForceStop();
}
} catch (Throwable t) {
LogUtils.error(log, callable.getJobName(), t.toString(), t);
}
callable.checkAndSetSaturnJobReturn();
callable.afterExecution();
} finally {
try {
if (doneFinallyCallback != null) {
doneFinallyCallback.call();
}
} catch (Exception e) {
LogUtils.error(log, callable.getJobName(), e.toString(), e);
}
}
}
这里我们分几种场景,如下
场景 | 解决 | |
executor崩溃 | 应用正常/崩溃 | 杀死崩溃的executor,并对失败分片转移 |
executor未崩溃 | 应用正常 | console界面点击“禁用”=>“立即终止”,在此之前在应用端实现postForceStop,实现业务端资源终止,executor在终止完线程之后会执行postForceStop |
executor未崩溃 | 应用崩溃 | console界面点击“禁用”=>“立即终止” |
以上是在调研过程中做的总结,如有不到之处,请指正。
其他问题点,后续补充。。。