@Component
public class DistributeLock {
@Autowired
private HeraHostRelationService hostGroupService;
@Autowired
private HeraLockService heraLockService;
@Autowired
private WorkClient workClient;
@Autowired
private HeraSchedule heraSchedule;
private final long timeout = 1000 * 60 * 5L;
private final String ON_LINE = "online";
@PostConstruct
public void init() {
workClient.workSchedule.scheduleAtFixedRate(() -> {
try {
checkLock();
} catch (Exception e) {
e.printStackTrace();
}
}, 10, 60, TimeUnit.SECONDS);
}
public void checkLock() {
HeraLock heraLock = heraLockService.findBySubgroup(ON_LINE);
if (heraLock == null) {
Date date = new Date();
heraLock = HeraLock.builder()
.id(1)
.host(WorkContext.host)
.serverUpdate(date)
.subgroup(ON_LINE)
.gmtCreate(date)
.gmtModified(date)
.build();
Integer lock = heraLockService.insert(heraLock);
if (lock == null || lock <= 0) {
return;
}
}
if (WorkContext.host.equals(heraLock.getHost().trim())) {
heraLock.setServerUpdate(new Date());
heraLockService.update(heraLock);
HeraLog.info("hold lock and update time");
heraSchedule.startup();
} else {
long currentTime = System.currentTimeMillis();
long lockTime = heraLock.getServerUpdate().getTime();
long interval = currentTime - lockTime;
if (interval > timeout && isPreemptionHost()) {
Date date = new Date();
Integer lock = heraLockService.changeLock(WorkContext.host, date, date, heraLock.getHost());
if (lock != null && lock > 0) {
ErrorLog.error("master 发生切换,{} 抢占成功", WorkContext.host);
heraSchedule.startup();
heraLock.setHost(WorkContext.host);
//TODO 接入master切换通知
} else {
HeraLog.info("master抢占失败,由其它worker抢占成功");
}
} else {
//非主节点,调度器不执行
heraSchedule.shutdown();
}
}
workClient.init();
try {
workClient.connect(heraLock.getHost().trim());
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 检测该ip是否具有抢占master的权限
* @return 是/否
*/
private boolean isPreemptionHost() {
List<String> preemptionHostList = hostGroupService.findPreemptionGroup(HeraGlobalEnvironment.preemptionMasterGroup);
if (preemptionHostList.contains(WorkContext.host)) {
return true;
} else {
HeraLog.info(WorkContext.host + " is not in master group " + preemptionHostList.toString());
return false;
}
}
}
1.通过PostConstruct在项目启动的时候触发定时任务。。
2.定时任务主要通过数库来进行强占master
3.如果是master,定时任务做的事情就是update heraLock ServerUpdate
4.如果是worker,定时任务做的事情就是查看master是不是长久没更新ServerUpdate,然后强占
5. heraSchedule.startup();workClient.init();workClient.connect(heraLock.getHost().trim()) 这三方法里面有状态,会置空操作。
masterServer的启动入口为,heraSchedule.startup()-->masterxContext.init()
public void init() {
threadPool = new ThreadPoolExecutor(
0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new NamedThreadFactory("master-wait-response"), new ThreadPoolExecutor.AbortPolicy());
masterSchedule = new ScheduledThreadPoolExecutor(5, new NamedThreadFactory("master-schedule", false));
masterSchedule.setKeepAliveTime(5, TimeUnit.MINUTES);
masterSchedule.allowCoreThreadTimeOut(true);
this.getQuartzSchedulerService().start();
dispatcher = new Dispatcher();
handler = new MasterHandler(this);//主handler
masterServer = new MasterServer(handler);//主通讯类
masterServer.start(HeraGlobalEnvironment.getConnectPort());//启动
master.init(this);
HeraLog.info("end init master content success ");
}
而MasterSever主要关注MasterHandler
public MasterHandler(MasterContext masterContext) {
this.masterContext = masterContext;
//把处理结果统一管理
completionService = new ExecutorCompletionService<>(
new ThreadPoolExecutor(
0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new NamedThreadFactory("master-execute", false), new ThreadPoolExecutor.AbortPolicy()));
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 1, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("master-deal", false), new ThreadPoolExecutor.AbortPolicy());
executor.execute(() -> {
Future<ChannelResponse> future;
ChannelResponse response;
while (true) {//一量处理有结果,即时响应
try {
future = completionService.take();
response = future.get();
TaskLog.info("3-1.MasterHandler:-->master prepare send status : {}", response.webResponse.getStatus());
response.channel.writeAndFlush(wrapper(response.webResponse));
TaskLog.info("3-2.MasterHandler:-->master send response success, requestId={}", response.webResponse.getRid());
} catch (Exception e) {
ErrorLog.error("master handler future take error:{}", e);
e.printStackTrace();
} catch (Throwable throwable) {
ErrorLog.error("master handler future take throwable{}", throwable);
throwable.printStackTrace();
}
}
}
);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
SocketMessage socketMessage = (SocketMessage) msg;
Channel channel = ctx.channel();
switch (socketMessage.getKind()) {
//心跳
case REQUEST://如果是内置任务无需处理结果
Request request = Request.newBuilder().mergeFrom(socketMessage.getBody()).build();
switch (request.getOperate()) {
case HeartBeat:
masterContext.getThreadPool().execute(() -> MasterHandleRequest.handleHeartBeat(masterContext, channel, request));
break;
case SetWorkInfo:
//此处省略代码。。
break;
}
break;
case WEB_REQUEST://web任务统处理,结果由专门线程返回
final WebRequest webRequest = WebRequest.newBuilder().mergeFrom(socketMessage.getBody()).build();
switch (webRequest.getOperate()) {
case ExecuteJob:
//此处省略代码。。。
break;
case RESPONSE://对于客户端内置请求响应结果交给监听器
masterContext.getThreadPool().execute(() -> {
Response response = null;
try {
response = Response.newBuilder().mergeFrom(socketMessage.getBody()).build();
SocketLog.info("6.MasterHandler:receiver socket info from work {}, response is {}", ctx.channel().remoteAddress(), response.getRid());
for (ResponseListener listener : listeners) {
listener.onResponse(response);
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
});
break;
case WEB_RESPONSE://对于客户端WEb请求响应结果交给监听器
masterContext.getThreadPool().execute(() -> {
WebResponse webResponse = null;
try {
webResponse = WebResponse.newBuilder().mergeFrom(socketMessage.getBody()).build();
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
SocketLog.info("6.MasterHandler:receiver socket info from work {}, webResponse is {}", ctx.channel().remoteAddress(), webResponse.getRid());
for (ResponseListener listener : listeners) {
listener.onWebResponse(webResponse);
}
});
break;
default:
ErrorLog.error("unknown request type : {}", socketMessage.getKind());
break;
}
}
//再看看监听器,任务里面有个ID号可以对应起结果
public class WorkResponseListener extends ResponseListenerAdapter {
private RpcWebRequest.WebRequest request;
private volatile Boolean receiveResult;
private CountDownLatch latch;
private RpcWebResponse.WebResponse webResponse;
@Override
public void onWebResponse(RpcWebResponse.WebResponse response) {
if (request.getRid() == response.getRid()) {
try {
webResponse = response;
receiveResult = true;
} catch (Exception e) {
ErrorLog.error("work release exception {}", e);
} finally {
latch.countDown();
}
}
}
}
workclient的设计方式与masterServer设计方式如出一辙,具体查看WorkHandler类
//workClient中init方法,定时上报workerHandlerHeartBeat
workSchedule.schedule(new Runnable() {
private WorkerHandlerHeartBeat workerHandlerHeartBeat = new WorkerHandlerHeartBeat();
private int failCount = 0;
@Override
public void run() {
try {
if (workContext.getServerChannel() != null) {
//这是主方法
boolean send = workerHandlerHeartBeat.send(workContext);
if (!send) {
failCount++;
ErrorLog.error("send heart beat failed ,failCount :" + failCount);
} else {
failCount = 0;
HeartLog.info("send heart beat success:{}", workContext.getServerChannel().getRemoteAddress());
}
} else {
ErrorLog.error("server channel can not find on " + WorkContext.host);
}
} catch (Exception e) {
ErrorLog.error("heart beat error:", e);
} finally {
workSchedule.schedule(this, (failCount + 1) * HeraGlobalEnvironment.getHeartBeat(), TimeUnit.SECONDS);
}
}
}, HeraGlobalEnvironment.getHeartBeat(), TimeUnit.SECONDS);
//主要send方法
public boolean send(WorkContext context) {
try {
MemUseRateJob memUseRateJob = new MemUseRateJob(1);
//内存信息获取
memUseRateJob.readMemUsed();
CpuLoadPerCoreJob loadPerCoreJob = new CpuLoadPerCoreJob();
//负载信息获取
loadPerCoreJob.run();
//构建包体
RpcHeartBeatMessage.HeartBeatMessage hbm = RpcHeartBeatMessage.HeartBeatMessage.newBuilder()
.setHost(WorkContext.host)
.setMemTotal(memUseRateJob.getMemTotal())
.setMemRate(memUseRateJob.getRate())
.setCpuLoadPerCore(loadPerCoreJob.getLoadPerCore())
.setTimestamp(System.currentTimeMillis())
.addAllDebugRunnings(context.getDebugRunning().keySet())
.addAllManualRunnings(context.getManualRunning().keySet())
.addAllRunnings(context.getRunning().keySet())
.setCores(WorkContext.cpuCoreNum)
.build(); context.getServerChannel().writeAndFlush(RpcSocketMessage.SocketMessage.newBuilder().
setKind(RpcSocketMessage.SocketMessage.Kind.REQUEST).
setBody(RpcRequest.Request.newBuilder().
setRid(AtomicIncrease.getAndIncrement()).
setOperate(RpcOperate.Operate.HeartBeat).
setBody(hbm.toByteString()).
build().toByteString()).
build());
} catch (RemotingException e) {
e.printStackTrace();
return false;
}
return true;
}
//master对于heartBeat的处理方式
public static void handleHeartBeat(MasterContext masterContext, Channel channel, Request request) {
//放到master服务器的内存中,等待WEB应用的调用
MasterWorkHolder workHolder = masterContext.getWorkMap().get(channel);
HeartBeatInfo heartBeatInfo = new HeartBeatInfo();
HeartBeatMessage heartBeatMessage;
try {
heartBeatMessage = HeartBeatMessage.newBuilder().mergeFrom(request.getBody()).build();
heartBeatInfo.setHost(heartBeatMessage.getHost());
heartBeatInfo.setMemRate(heartBeatMessage.getMemRate());
heartBeatInfo.setMemTotal(heartBeatMessage.getMemTotal());
heartBeatInfo.setCpuLoadPerCore(heartBeatMessage.getCpuLoadPerCore());
heartBeatInfo.setRunning(heartBeatMessage.getRunningsList());
heartBeatInfo.setDebugRunning(heartBeatMessage.getDebugRunningsList());
heartBeatInfo.setManualRunning(heartBeatMessage.getManualRunningsList());
heartBeatInfo.setTimestamp(heartBeatMessage.getTimestamp());
heartBeatInfo.setCores(heartBeatMessage.getCores());
workHolder.setHeartBeatInfo(heartBeatInfo);
HeartLog.info("received heart beat from {} : {}", heartBeatMessage.getHost(), JSONObject.toJSONString(heartBeatInfo));
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
1.work启动时,启动定时任务,专们负责收集数据,并发送给master
2.master,收到信息存在自己内存中,等待web调用并返回
//研发中心立即执行
@RequestMapping(value = "/debugSelectCode", method = RequestMethod.POST)
@ResponseBody
public WebAsyncTask<JsonResponse> debugSelectCode(@RequestBody HeraFile heraFile) {
String owner = getOwner();
return new WebAsyncTask<JsonResponse>(HeraGlobalEnvironment.getRequestTimeout(), () -> {
Map<String, Object> res = new HashMap<>(2);
HeraFile file = heraFileService.findById(heraFile.getId());
file.setContent(heraFile.getContent());
String name = file.getName();
String runType;
HeraDebugHistory history = HeraDebugHistory.builder()
.fileId(file.getId())
.script(heraFile.getContent())
.startTime(new Date())
.owner(owner)
.hostGroupId(file.getHostGroupId() == 0 ? HeraGlobalEnvironment.defaultWorkerGroup : file.getHostGroupId())
.build();
int suffixIndex = name.lastIndexOf(Constants.POINT);
if (suffixIndex == -1) {
return new JsonResponse(false, "无后缀名,请设置支持的后缀名[.sh .hive .spark]");
}
String suffix = name.substring(suffixIndex);
//只支持shell,hive,spark
if ((Constants.HIVE_SUFFIX).equalsIgnoreCase(suffix)) {
runType = Constants.HIVE_FILE;
} else if ((Constants.SHELL_SUFFIX).equalsIgnoreCase(suffix)) {
runType = Constants.SHELL_FILE;
} else if ((Constants.SPARK_SUFFIX).equalsIgnoreCase(suffix)) {
runType = Constants.SPARK_FILE;
} else {
return new JsonResponse(false, "暂未支持的后缀名[" + suffix + "],请设置支持的后缀名[.sh .hive .spark]");
}
history.setRunType(runType);
String newId = debugHistoryService.insert(history);
//向master提交任务
workClient.executeJobFromWeb(JobExecuteKind.ExecuteKind.DebugKind, newId);
res.put("fileId", file.getId());
res.put("debugId", newId);
return new JsonResponse(true, "执行成功", res);
});
}
//master端对此任务的相关处理,则是加入DebugQueue,进入等待队列
public void debug(HeraDebugHistoryVo debugHistory) {
JobElement element = JobElement.builder()
.jobId(debugHistory.getId())
.hostGroupId(debugHistory.getHostGroupId())
.build();
debugHistory.setStatus(StatusEnum.RUNNING);
debugHistory.setStartTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
debugHistory.getLog().append(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 进入任务队列");
masterContext.getHeraDebugHistoryService().update(BeanConvertUtils.convert(debugHistory));
try {
masterContext.getDebugQueue().put(element);
} catch (InterruptedException e) {
ErrorLog.error("添加开发中心执行任务失败:" + element.getJobId(), e);
}
}
/**
* 在master.init的时候会,会启动定时任务
* 扫描任务等待队列,取出任务去执行
* 对于没有可运行机器的时,manual,debug任务重新offer到原队列
*/
public boolean scan() throws InterruptedException {
boolean hasTask = false;
if (!masterContext.getScheduleQueue().isEmpty()) {
代码省略
}
if (!masterContext.getManualQueue().isEmpty()) {
//代码省略
}
if (!masterContext.getDebugQueue().isEmpty()) {
//拿出任务
JobElement jobElement = masterContext.getDebugQueue().take();
MasterWorkHolder selectWork = getRunnableWork(jobElement);
if (selectWork == null) {
masterContext.getDebugQueue().put(jobElement);
ScheduleLog.warn("can not get work to execute DebugQueue job in master,job is:{}", jobElement.toString());
} else {
//当有机器的时候,直接给机子分配任务(向work发送任务)
runDebugJob(selectWork, jobElement.getJobId());
hasTask = true;
}
}
return hasTask;
}
/**
* worker接收到命令,JobUtils.createDebugJob创建job文件到服务器,拼接shell,并调用命令执行
*/
private Future<RpcResponse.Response> debug(WorkContext workContext, RpcRequest.Request request) {
RpcDebugMessage.DebugMessage debugMessage = null;
try {
debugMessage = RpcDebugMessage.DebugMessage.newBuilder().mergeFrom(request.getBody()).build();
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
String debugId = debugMessage.getDebugId();
HeraDebugHistoryVo history = workContext.getHeraDebugHistoryService().findById(Integer.parseInt(debugId));
return workContext.getWorkExecuteThreadPool().submit(() -> {
int exitCode = -1;
Exception exception = null;
ResponseStatus.Status status;
try {
history.setExecuteHost(WorkContext.host);
workContext.getHeraDebugHistoryService().update(BeanConvertUtils.convert(history));
String date = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
File directory = new File(HeraGlobalEnvironment.getWorkDir() + File.separator + date + File.separator + "debug-" + debugId);
if (!directory.exists()) {
if (directory.mkdirs()) {
HeraLog.error("创建文件失败:" + directory.getAbsolutePath());
}
}
Job job = JobUtils.createDebugJob(new JobContext(JobContext.DEBUG_RUN), BeanConvertUtils.convert(history),
directory.getAbsolutePath(), workContext);
workContext.getDebugRunning().putIfAbsent(debugId, job);
exitCode = job.run();
} catch (Exception e) {
exception = e;
history.getLog().appendHeraException(e);
} finally {
HeraDebugHistoryVo heraDebugHistoryVo = workContext.getHeraDebugHistoryService().findById(Integer.parseInt(debugId));
heraDebugHistoryVo.setEndTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
StatusEnum statusEnum = getStatusFromCode(exitCode);
if (exitCode == 0) {
status = ResponseStatus.Status.OK;
heraDebugHistoryVo.setStatus(statusEnum);
} else {
status = ResponseStatus.Status.ERROR;
heraDebugHistoryVo.setStatus(statusEnum);
}
workContext.getHeraDebugHistoryService().updateStatus(BeanConvertUtils.convert(heraDebugHistoryVo));
HeraDebugHistoryVo debugHistory = workContext.getDebugRunning().get(debugId).getJobContext().getDebugHistory();
workContext.getHeraDebugHistoryService().updateLog(BeanConvertUtils.convert(debugHistory));
workContext.getDebugRunning().remove(debugId);
}
String errorText = "";
if (exception != null && exception.getMessage() != null) {
errorText = exception.getMessage();
}
return RpcResponse.Response.newBuilder()
.setRid(request.getRid())
.setOperate(RpcOperate.Operate.Debug)
.setStatusEnum(status)
.setErrorText(errorText)
.build();
});
}
/**
* 获取hostGroupId中可以分发任务的worker
*/
private MasterWorkHolder getRunnableWork(JobElement jobElement) {
//根据算法选择对应的work
MasterWorkHolder selectWork = loadBalance.select(jobElement, masterContext);
if (selectWork == null) {
return null;
}
Channel channel = selectWork.getChannel().getChannel();
HeartBeatInfo beatInfo = selectWork.getHeartBeatInfo();
// 如果最近两次选择的work一致 需要等待机器最新状态发来之后(睡眠)再进行任务分发
if (HeraGlobalEnvironment.getWarmUpCheck() > 0 && lastWork != null && channel == lastWork && (beatInfo.getCpuLoadPerCore() > 0.6F || beatInfo.getMemRate() > 0.7F)) {
ScheduleLog.info("达到预热条件,睡眠" + HeraGlobalEnvironment.getWarmUpCheck() + "秒");
try {
TimeUnit.SECONDS.sleep(HeraGlobalEnvironment.getWarmUpCheck());
} catch (InterruptedException e) {
e.printStackTrace();
}
lastWork = null;
return null;
}
lastWork = channel;
return selectWork;
}
/**
* 定时 刷新日志到数据库
*/
workSchedule.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
for (Job job : workContext.getRunning().values()) {
//处理日志
}
for (Job job : workContext.getManualRunning().values()) {
//处理日志
}
for (Job job : workContext.getDebugRunning().values()) {
try {
//服务在运行中会向这类不断导入日志
HeraDebugHistoryVo history = job.getJobContext().getDebugHistory();
workContext.getHeraDebugHistoryService().updateLog(BeanConvertUtils.convert(history));
} catch (Exception e) {
printDebugLog(job, e);
}
}
} catch (Exception e) {
ErrorLog.error("job log flush exception:{}", e.toString());
}
}
}, 0, 5, TimeUnit.SECONDS);
}
1.脚本在运行中的时候,work都会为每个脚本维护一个HeraJobHistoryVo,其中 LogContent log属性会和服务输入输出流对应
2.work启动的时候,会启动一个定时刷新日志在数据库的进程
3.web端通个查询数据库很好的达到时实
/**
* 事件广播,每次任务状态变化,触发响应事件,全局广播,自动调度successEvent,触发依赖调度一些依赖更新
*
* @param applicationEvent
*/
public void dispatch(ApplicationEvent applicationEvent) {
try {
//封装事件
MvcEvent mvcEvent = new MvcEvent(this, applicationEvent);
mvcEvent.setApplicationEvent(applicationEvent);
//前置监听者
if (fireEvent(beforeDispatch, mvcEvent)) {
List<AbstractHandler> jobHandlersCopy = Lists.newArrayList(jobHandlers);
for (AbstractHandler jobHandler : jobHandlersCopy) {
//是否可执行
if (jobHandler.canHandle(applicationEvent)) {
if (!jobHandler.isInitialized()) {
jobHandler.setInitialized(true);
}
//job触发
jobHandler.handleEvent(applicationEvent);
}
}
//后置监听者
fireEvent(afterDispatch, mvcEvent);
}
} catch (Exception e) {
ErrorLog.error("global dispatch job event error", e);
}
}
1.dispatch对于事件的主要处理顺序 前置监听 ---- jobHandler ----- 后置监听
private void runDebugJob(MasterWorkHolder selectWork, String debugId) {
final MasterWorkHolder workHolder = selectWork;
this.executeJobPool.execute(() -> {
HeraDebugHistoryVo history = masterContext.getHeraDebugHistoryService().findById(Integer.parseInt(debugId));
history.getLog().append(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 开始运行");
masterContext.getHeraDebugHistoryService().update(BeanConvertUtils.convert(history));
Exception exception = null;
RpcResponse.Response response = null;
Future<RpcResponse.Response> future = null;
try {
future = new MasterExecuteJob().executeJob(masterContext, workHolder, JobExecuteKind.ExecuteKind.DebugKind, debugId);
response = future.get(HeraGlobalEnvironment.getTaskTimeout(), TimeUnit.HOURS);
} catch (Exception e) {
exception = e;
if (future != null) {
future.cancel(true);
}
DebugLog.error(String.format("debugId:%s run failed", debugId), e);
}
boolean success = response != null && response.getStatusEnum() == ResponseStatus.Status.OK;
if (!success) {
exception = new HeraException(String.format("fileId:%s run failed ", history.getFileId()), exception);
TaskLog.info("8.Master: debug job error");
history = masterContext.getHeraDebugHistoryService().findById(Integer.parseInt(debugId));
//创建失败事件
HeraDebugFailEvent failEvent = HeraDebugFailEvent.builder()
.debugHistory(BeanConvertUtils.convert(history))
.throwable(exception)
.fileId(history.getFileId())
.build();
//将失败事件通知关心的人
masterContext.getDispatcher().forwardEvent(failEvent);
} else {
TaskLog.info("7.Master: debug success");
//创建成功事件
HeraDebugSuccessEvent successEvent = HeraDebugSuccessEvent.builder()
.fileId(history.getFileId())
.history(BeanConvertUtils.convert(history))
.build();
//将成功事件通知关心的人
masterContext.getDispatcher().forwardEvent(successEvent);
}
});
}
public void alarm(HeraJobFailedEvent failedEvent) {
String actionId = failedEvent.getActionId();
Integer jobId = ActionUtil.getJobId(actionId);
if (jobId == null) {
return;
}
HeraJob heraJob = heraJobService.findById(jobId);
// 自己建立的任务运行失败必须收到告警
if (heraJob.getAuto() != 1 && !Constants.PUB_ENV.equals(HeraGlobalEnvironment.getEnv())) {
return;
}
StringBuilder address = new StringBuilder();
HeraUser user = heraUserService.findByName(heraJob.getOwner());
address.append(user.getEmail().trim()).append(Constants.SEMICOLON);
try {
HeraJobMonitor monitor = heraJobMonitorService.findByJobIdWithOutBlank(heraJob.getId());
if (monitor == null && Constants.PUB_ENV.equals(HeraGlobalEnvironment.getEnv())) {
ScheduleLog.info("任务无监控人,发送给owner:{}", heraJob.getId());
} else if (monitor != null) {
String ids = monitor.getUserIds();
String[] id = ids.split(Constants.COMMA);
for (String anId : id) {
if (StringUtils.isBlank(anId)) {
continue;
}
HeraUser monitor_user = heraUserService.findById(Integer.parseInt(anId));
if (monitor_user != null && monitor_user.getEmail() != null) {
address.append(monitor_user.getEmail().trim()).append(Constants.SEMICOLON);
}
}
}
String title = "hera调度任务失败[任务=" + heraJob.getName() + "(" + heraJob.getId() + "),版本号=" + actionId + "]";
String content = "任务ID:" + heraJob.getId() + Constants.HTML_NEW_LINE
+ "任务名:" + heraJob.getName() + Constants.HTML_NEW_LINE
+ "任务版本号:" + actionId + Constants.HTML_NEW_LINE
+ "任务描述:" + heraJob.getDescription() + Constants.HTML_NEW_LINE
+ "任务OWNER:" + heraJob.getOwner() + Constants.HTML_NEW_LINE;
String errorMsg = failedEvent.getHeraJobHistory().getLog().getMailContent();
if (errorMsg != null) {
content += Constants.HTML_NEW_LINE + Constants.HTML_NEW_LINE + "--------------------------------------------" + Constants.HTML_NEW_LINE + errorMsg;
}
emailService.sendEmail(title, content, address.toString());
} catch (MessagingException e) {
e.printStackTrace();
ErrorLog.error("发送邮件失败");
}
1.在master.init中 有 masterContext.getDispatcher().addDispatcherListener(new HeraJobFailListener());这一代码。
他关注job失败的事件,当master端收的失败响应,则会触发事件
2.任务失败事件的最终响应者是 EmailJobFailAlarm,发邮件通知责任人
//isSingle是否全量生成版本
//jobId 如果是单个,其任务ID
private boolean generateAction(boolean isSingle, Integer jobId) {
try {
//防止全量重复调用,
if (isGenerateActioning) {
return true;
}
DateTime dateTime = new DateTime();
Date now = dateTime.toDate();
int executeHour = dateTime.getHourOfDay();
//凌晨生成版本,早上七点以后开始再次生成版本
boolean execute = executeHour == 0 || (executeHour > ActionUtil.ACTION_CREATE_MIN_HOUR && executeHour <= ActionUtil.ACTION_CREATE_MAX_HOUR);
if (execute || isSingle) {
String currString = ActionUtil.getCurrHourVersion();
if (executeHour == ActionUtil.ACTION_CREATE_MAX_HOUR) {
Tuple<String, Date> nextDayString = ActionUtil.getNextDayString();
//例如:今天 2018.07.17 23:50 currString = 201807180000000000 now = 2018.07.18 23:50
currString = nextDayString.getSource();
now = nextDayString.getTarget();
}
Long nowAction = Long.parseLong(currString);
ConcurrentHashMap<Long, HeraAction> actionMap = new ConcurrentHashMap<>(heraActionMap.size());
List<HeraJob> jobList = new ArrayList<>();
//批量生成
if (!isSingle) {
isGenerateActioning = true;
jobList = masterContext.getHeraJobService().getAll();
} else { //单个任务生成版本
HeraJob heraJob = masterContext.getHeraJobService().findById(jobId);
jobList.add(heraJob);
actionMap = heraActionMap;
List<Long> shouldRemove = new ArrayList<>();
for (Long actionId : actionMap.keySet()) {
if (StringUtil.actionIdToJobId(String.valueOf(actionId), String.valueOf(jobId))) {
shouldRemove.add(actionId);
}
}
//移除内存所有依赖这个Job的老版本
shouldRemove.forEach(actionMap::remove);
List<AbstractHandler> handlers = new ArrayList<>(masterContext.getDispatcher().getJobHandlers());
移除内存所有依赖这个Job的老版本任务
if (handlers != null && handlers.size() > 0) {
for (AbstractHandler handler : handlers) {
JobHandler jobHandler = (JobHandler) handler;
if (StringUtil.actionIdToJobId(jobHandler.getActionId(), String.valueOf(jobId))) {
masterContext.getQuartzSchedulerService().deleteJob(jobHandler.getActionId());
masterContext.getDispatcher().removeJobHandler(jobHandler);
}
}
}
}
String cronDate = ActionUtil.getActionVersionPrefix(now);
Map<Integer, List<HeraAction>> idMap = new HashMap<>(jobList.size());
Map<Integer, HeraJob> jobMap = new HashMap<>(jobList.size());
//生成新的版本,并且注入actionMap
generateScheduleJobAction(jobList, cronDate, actionMap, nowAction, idMap, jobMap);
//整理新的依赖关系
for (Map.Entry<Integer, HeraJob> entry : jobMap.entrySet()) {
generateDependJobAction(jobMap, entry.getValue(), actionMap, nowAction, idMap);
}
if (executeHour < ActionUtil.ACTION_CREATE_MAX_HOUR) {
heraActionMap = actionMap;
}
Dispatcher dispatcher = masterContext.getDispatcher();
if (dispatcher != null) {
if (actionMap.size() > 0) {
for (Long id : actionMap.keySet()) {
dispatcher.addJobHandler(new JobHandler(id.toString(), masterContext.getMaster(), masterContext));
//新加入的版本发送事件通知
if (id >= Long.parseLong(currString)) {
dispatcher.forwardEvent(new HeraJobMaintenanceEvent(Events.UpdateActions, id.toString()));
}
}
}
}
ScheduleLog.info("[单个任务:{},任务id:{}]generate action success", isSingle, jobId);
return true;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
isGenerateActioning = false;
}
return false;
}
定时调度
主要是根据cron表达式来解析该任务的执行时间,在达到触发时间时将该任务加入任务队列
/**
* 自动任务的版本生成
*
* @param jobList 任务集合
* @param cronDate 日期
* @param actionMap actionMap集合
* @param nowAction 生成版本时间的action
* @param idMap 已经遍历过的idMap
* @param jobMap 依赖任务map映射
*/
public void generateScheduleJobAction(List<HeraJob> jobList, String cronDate, Map<Long, HeraAction> actionMap, Long nowAction, Map<Integer, List<HeraAction>> idMap, Map<Integer, HeraJob> jobMap) {
List<HeraAction> insertActionList = new ArrayList<>();
for (HeraJob heraJob : jobList) {
if (heraJob.getScheduleType() != null) {
if (heraJob.getScheduleType() == 1) {
jobMap.put(heraJob.getId(), heraJob);
} else if (heraJob.getScheduleType() == 0) {
//如果是定时任务,会解析表达式,拆分出多个版本
String cron = heraJob.getCronExpression();
List<String> list = new ArrayList<>();
if (StringUtils.isNotBlank(cron)) {
boolean isCronExp = CronParse.Parser(cron, cronDate, list);
if (!isCronExp) {
ErrorLog.error("cron parse error,jobId={},cron = {}", heraJob.getId(), cron);
continue;
}
List<HeraAction> heraAction = createHeraAction(list, heraJob);
idMap.put(heraJob.getId(), heraAction);
insertActionList.addAll(heraAction);
}
} else {
ErrorLog.error("任务{}未知的调度类型{}", heraJob.getId(), heraJob.getScheduleType());
}
}
}
batchInsertList(insertActionList, actionMap, nowAction);
}
//jobHandler中的Events.UpdateActions事件处理
private void autoRecovery() {
cache.refresh();
HeraActionVo heraActionVo = cache.getHeraActionVo();
//任务被删除
if (heraActionVo == null) {
masterContext.getDispatcher().removeJobHandler(this);
destroy();
ScheduleLog.info("heraAction 为空, 删除{}", actionId);
return;
}
//自动调度关闭
if (!heraActionVo.getAuto()) {
destroy();
return;
}
/**
* 如果是依赖任务 原来可能是独立任务,需要尝试删除原来的定时调度
* 如果是独立任务,则重新创建quartz调度
*
*/
if (heraActionVo.getScheduleType() == JobScheduleTypeEnum.Dependent) {
destroy();
} else if (heraActionVo.getScheduleType() == JobScheduleTypeEnum.Independent) {
try {
createScheduleJob(masterContext.getDispatcher(), heraActionVo);
} catch (SchedulerException e) {
e.printStackTrace();
}
}
}
/**
* 创建定时任务
*
* @param dispatcher the scheduler
* @param heraActionVo the job name
*/
private void createScheduleJob(Dispatcher dispatcher, HeraActionVo heraActionVo) throws SchedulerException {
if (!ActionUtil.isCurrActionVersion(actionId)) {
return;
}
JobKey jobKey = new JobKey(actionId, Constants.HERA_GROUP);
if (masterContext.getQuartzSchedulerService().getScheduler().getJobDetail(jobKey) == null) {
JobDetail jobDetail = JobBuilder.newJob(HeraQuartzJob.class).withIdentity(jobKey).build();
jobDetail.getJobDataMap().put("actionId", heraActionVo.getId());
jobDetail.getJobDataMap().put("dispatcher", dispatcher);
//TODO 根据任务区域加时区
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(heraActionVo.getCronExpression().trim())/*.inTimeZone()*/;
CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(actionId, Constants.HERA_GROUP).withSchedule(scheduleBuilder).build();
masterContext.getQuartzSchedulerService().getScheduler().scheduleJob(jobDetail, trigger);
ScheduleLog.info("--------------------------- 添加自动调度成功:{}--------------------------", heraActionVo.getId());
}
}
依赖调度
我们的任务大部分都有依赖关系,只有在上一个任务计算出结果后才能进行下一步的执行。我们的依赖任务会在所有的依赖任务都执行完成之后才会被触发加入任务队列
/**
* 递归生成任务依赖action
*
* @param jobMap 任务映射map
* @param heraJob 当前生成版本的任务
* @param actionMap 版本map
* @param nowAction 生成版本时间的action
* @param idMap job的id集合 只要已经检测过的id都放入idSet中
*/
private void generateDependJobAction(Map<Integer, HeraJob> jobMap, HeraJob heraJob, Map<Long, HeraAction> actionMap, Long nowAction, Map<Integer, List<HeraAction>> idMap) {
if (heraJob == null || idMap.containsKey(heraJob.getId())) {
return;
}
String jobDependencies = heraJob.getDependencies();
if (StringUtils.isNotBlank(jobDependencies)) {
Map<String, List<HeraAction>> dependenciesMap = new HashMap<>(1024);
String[] dependencies = jobDependencies.split(Constants.COMMA);
String actionMinDeps = "";
boolean noAction = false;
for (String dependentId : dependencies) {
Integer dpId = Integer.parseInt(dependentId);
//如果idSet不包含依赖任务dpId 则递归查找
if (!idMap.containsKey(dpId)) {
generateDependJobAction(jobMap, jobMap.get(dpId), actionMap, nowAction, idMap);
}
List<HeraAction> dpActions = idMap.get(dpId);
dependenciesMap.put(dependentId, dpActions);
if (dpActions == null || dpActions.size() == 0) {
ErrorLog.warn("{}今天找不到版本,无法为任务{}生成版本", dependentId, heraJob.getId());
noAction = true;
break;
}
if (StringUtils.isBlank(actionMinDeps)) {
actionMinDeps = dependentId;
}
//找到所依赖的任务中版本最少的作为基准版本。
if (dependenciesMap.get(actionMinDeps).size() > dependenciesMap.get(dependentId).size()) {
actionMinDeps = dependentId;
} else if (dependenciesMap.get(dependentId).size() > 0 && dependenciesMap.get(actionMinDeps).size() == dependenciesMap.get(dependentId).size() &&
dependenciesMap.get(actionMinDeps).get(0).getId() < dependenciesMap.get(dependentId).get(0).getId()) {
//如果两个版本的个数一样 那么应该找一个时间较大的
actionMinDeps = dependentId;
}
}
if (noAction) {
idMap.put(heraJob.getId(), null);
} else {
List<HeraAction> actionMinList = dependenciesMap.get(actionMinDeps);
if (actionMinList != null && actionMinList.size() > 0) {
List<HeraAction> insertList = new ArrayList<>();
for (HeraAction action : actionMinList) {
StringBuilder actionDependencies = new StringBuilder(action.getId().toString());
Long longActionId = Long.parseLong(actionDependencies.toString());
for (String dependency : dependencies) {
if (!dependency.equals(actionMinDeps)) {
List<HeraAction> otherAction = dependenciesMap.get(dependency);
if (otherAction == null || otherAction.size() == 0) {
continue;
}
//找到一个离基准版本时间最近的action,添加为该任务的依赖
String otherActionId = otherAction.get(0).getId().toString();
for (HeraAction o : otherAction) {
if (Math.abs(o.getId() - longActionId) < Math.abs(Long.parseLong(otherActionId) - longActionId)) {
otherActionId = o.getId().toString();
}
}
actionDependencies.append(",");
actionDependencies.append(Long.parseLong(otherActionId) / 1000000 * 1000000 + Long.parseLong(dependency));
}
}
HeraAction actionNew = new HeraAction();
BeanUtils.copyProperties(heraJob, actionNew);
Long actionId = longActionId / 1000000 * 1000000 + Long.parseLong(String.valueOf(heraJob.getId()));
actionNew.setId(actionId);
actionNew.setGmtCreate(new Date());
actionNew.setDependencies(actionDependencies.toString());
actionNew.setJobDependencies(heraJob.getDependencies());
actionNew.setJobId(heraJob.getId());
actionNew.setAuto(heraJob.getAuto());
actionNew.setHostGroupId(heraJob.getHostGroupId());
masterContext.getHeraJobActionService().insert(actionNew, nowAction);
actionMap.put(actionNew.getId(), actionNew);
insertList.add(actionNew);
}
idMap.put(heraJob.getId(), insertList);
}
}
}
}
/**
* 收到广播的任务成功事件的处理流程,每次自动调度任务成功执行,会进行一次全局的SuccessEvent广播,使得依赖任务可以更新readyDependent
*
* @param event
*/
private void handleSuccessEvent(HeraJobSuccessEvent event) {
if (event.getTriggerType() == TriggerTypeEnum.MANUAL) {
return;
}
String jobId = event.getJobId();
HeraActionVo heraActionVo = cache.getHeraActionVo();
if (heraActionVo == null) {
autoRecovery();
return;
}
if (!heraActionVo.getAuto()) {
return;
}
if (heraActionVo.getScheduleType() == JobScheduleTypeEnum.Independent) {
return;
}
if (heraActionVo.getDependencies() == null || !heraActionVo.getDependencies().contains(jobId)) {
return;
}
JobStatus jobStatus;
//必须同步
synchronized (this) {
jobStatus = heraJobActionService.findJobStatus(actionId);
ScheduleLog.info(actionId + "received a success dependency job with actionId = " + jobId);
jobStatus.getReadyDependency().put(jobId, String.valueOf(System.currentTimeMillis()));
heraJobActionService.updateStatus(jobStatus);
}
boolean allComplete = true;
for (String key : heraActionVo.getDependencies()) {
if (jobStatus.getReadyDependency().get(key) == null) {
allComplete = false;
break;
}
}
if (allComplete) {
ScheduleLog.info("JobId:" + actionId + " all dependency jobs is ready,run!");
startNewJob(event.getTriggerType(), heraActionVo);
} else {
ScheduleLog.info(actionId + "some of dependency is not ready, waiting" + JSONObject.toJSONString(jobStatus.getReadyDependency().keySet()));
}
}
generateDependJobAction算法比较复杂,其目的就是刷新下内存中 heraActionMap 中的依赖关系
手动调度与手动恢复
手动调度即为手动执行的任务,手动执行后自动加入任务队列,请注意,手动任务执行成功后不会通知下游任务(即:依赖于该任务的任务)该任务已经执行完成。
手动恢复类似于手动调度,于手动调度的区别为此时如果该任务执行成功,会通知下游任务该任务已经执行完成
/**
* 收到广播的任务成功事件的处理流程,每次自动调度任务成功执行,会进行一次全局的SuccessEvent广播,使得依赖任务可以更新readyDependent
*
* @param event
*/
private void handleSuccessEvent(HeraJobSuccessEvent event) {
if (event.getTriggerType() == TriggerTypeEnum.MANUAL) {
return;
}
//省略代码
}
1.手动执行和debug执行流程相似,不做多介绍
2.Jobhandler中handleSuccessEvent对手动的进行了过滤
关于抢占模式:
基本上所有的调度工作都集中在master,通过权限控制可以很好的做个主备master,但备master会处于空闲。。
优化点:
master 处于单机运行模式,是否能提供集群方式运行
如果集群关于work与master长链接维护的问题:
如果不是和dubbo一样每个master和work建立长链接,就为有调用不了的问题。。
如果work过多,对于master也是一种压力。。
master不建议设成work,是否可以改成 独占为master的work,不在接受工作调度
关于机器选用:
先去数据库(缓存)查询任务对应分组机子,与在线机子做差集。然后平均负载。因为缓存的关系,新加一台work要等待半小时。
优化点:
作为work是不是应该自己所在的分组。。为什么不在启动的时候或者web设置的时候告诉work是属于什么分组的
关于NIO通讯模型的使用
hera的设计于master与work的通讯基本上采用 master发出请求,然后等待,work处理返回, master做后置处理,调用事件模型dispatch,做通知。
优化点:
nio的特点是多路复用,netty框架的整个设计也是通知模式的。为什么采用同步等待+事件模型的方式。。