xxl-job概述
xxl-job代码总体分为调度中心和执行器部分,他们直接通过rpc(netty)通讯实现。
执行器部分是执行具体任务的模块。
调度中心主要是实现定时逻辑以及调度执行器任务,如果有多个执行器,执行器启动的时候会把执行器地址和端口注册到调度中心的数据库中,然后调度中心调度的时候可以按照定义的各种调度算法(比如轮询负载均衡,随机,广播等)来进行调度。
我接下来讲解下调度中心部分的核心定时执行逻辑,这部分是xxl-job定时核心代码和精髓所在。
定时任务逻辑和时间轮的算法的应用
时间轮算法介绍
先说下定时任务,比如下订单业务,一般下了订单后,都有个超时时间,超过这个时间后订单取消,这个操作你会怎么设计呢?
有几种办法,下面一一举例说明:
- 下单后,给每个订单开启一个定时调度任务,到达规定的时间后,执行失效方法。
- 我把订单过期的时间也持久化到数据库中,然后开一个定时任务每个1s去遍历一边数据库中未过期的订单数据,然后判断当前时间是否大于过期时间,如果大于则超时。
- 用时间轮的方式解决。
上面第一种方法肯定是最低效的,太浪费资源了,而且可靠性也不好。
第二种方法改善了好多,但是也有缺点,第一需要频繁地去读取数据库,第二每一秒要遍历所有的未过期的数据,即使大部分的任务还没到过期时间,但是也会去遍历一遍,如果数据量大也是比较浪费时间和性能的。
主角登场了!!!
时间轮算法就解决了上面的问题,时间轮会先划分出60个槽(举例按1分钟60秒),然后分为数据生产和消费
数据生产:
比如订单超时时间是60s,然后把产生的订单数据,按照他们的超时时间去对60求余,就算出来了一个60s内的时间值key,然后就把订单数据放入该key对应的一个list中。
数据消费:
每过一秒就去当前秒的key对应的list中取出来对应的订单,然后执行该订单的失效操作。
举例
举个实际例子来算一遍,比如订单超时时间是60s,然后时间轮划分了60个槽对应每一秒,
比如当前时间戳是61s(为了计算方便),首先用61对60求余为1,然后取出“1”对应槽的队列,执行失效操作,如果这个时候来了一个订单,用“当前时间”+60s = 121s,然后121s对60求余,结果为1,然后就把这个订单1放入“1”这个key对应的队列中,时间轮继续向前执行。
当当前时间走到62s的时候,首先用62对60求余为2,然后取出“2”对应槽的队列,执行失效操作,如果这个时候来了一个订单,用“当前时间”+60s = 122s,对60求余,结果为2,然后就把这个订单2放入,“2”这个key对应的队列中,时间轮继续向前执行。
然后时间轮还是走,当时间走到121s的时候,对60求余,会取出“1”这个key对应队列中的数据,就是订单1,执行失效操作,可以看到这个订单1放入的时间是61s过了60s被执行了。
当时间走到122s的时候,对60求余,会取出“2”这个key对应队列中的数据,就是订单2,执行失效操作,可以看到这个订单2,放入的时间是62s,过了60s被执行了。
可以看到时间轮每走一次获取的订单数据都是有效的,不会扫描所有的订单数据。
如果订单过期时间是120s,那么这里的刻度要是120个刻度,刻度和过期时间是对应的。
xxl-job中的时间轮应用
scheduleThread:
主要实现从数据库中读取任务触发时间小于now() + 5s的任务列表,然后遍历这些任务列表。
核心代码如下:
// schedule thread
scheduleThread = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
while (!scheduleThreadToStop) {
// Scan Job
long start = System.currentTimeMillis();
Connection conn = null;
Boolean connAutoCommit = null;
PreparedStatement preparedStatement = null;
boolean preReadSuc = true;
try {
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
// tx start
// 1、从数据库中读出来未来5s内的任务数据列表
long nowTime = System.currentTimeMillis();
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
if (scheduleList!=null && scheduleList.size()>0) {
// 2、push time-ring
for (XxlJobInfo jobInfo: scheduleList) {
// 当前时间可能会大于任务触发时间,比如从数据库中读取任务数据然后遍历到某个任务的时候
// 这个耗时时间可能超过了5s,这就会出现问题了,比如在61000时间戳这个时刻从数据库中查
// 出来的某个任务的下次触发时间戳是65000,当读出来后到遍历到这个任务耗时6000ms,当前时刻
// 时间戳就到了66000,超过了任务触发时间。
// “当前时间” > “任务触发时间”+5s
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
// 2.1、trigger-expire > 5s:pass && make next-trigger-time
logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
// 1、misfire match
MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
// FIRE_ONCE_NOW 》 trigger
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
}
// 2、更新下次触发时间
refreshNextValidTime(jobInfo, new Date());
} else if (nowTime > jobInfo.getTriggerNextTime()) {
// “当前时间” < “任务触发时间”+5s
// 1、直接去触发远程执行器执行任务
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
// 2、刷新下次触发时间
refreshNextValidTime(jobInfo, new Date());
// 当前时间” + 5s > "下次任务触发时间",则把该任务加入时间轮的队列中。
if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
} else {
// 当前时间还没到任务触发时间,则把该任务加入时间轮的队列中
// 1、计算时间轮的刻度key
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、把对应的任务放入到ringSecond对应的任务队列中
pushTimeRing(ringSecond, jobInfo.getId());
// 3、更新下次触发时间
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
}
// 3、update trigger info
for (XxlJobInfo jobInfo: scheduleList) {
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
}
} else {
preReadSuc = false;
}
} catch (Exception e) {
...
} finally {
...
}
long cost = System.currentTimeMillis()-start;
// Wait seconds, align second
if (cost < 1000) { // scan-overtime, not wait
try {
// pre-read period: success > scan each second; fail > skip this period;
TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
}
});
ringThread:
实现了时间轮的核心代码,代码如下:
ringThread = new Thread(new Runnable() {
@Override
public void run() {
// align second
try {
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 );
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}
//轮询
while (!ringThreadToStop) {
try {
// second data
List<Integer> ringItemData = new ArrayList<>();
int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
for (int i = 0; i < 2; i++) {
//从时间轮中取出某一刻度对应的数据并删除该任务列表,这里为了避免处理耗时太长,
//取了当前刻度和前一个刻度时间槽对应的数据
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
if (tmpData != null) {
ringItemData.addAll(tmpData);
}
}
// ring trigger
logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
if (ringItemData.size() > 0) {
// 遍历取出的任务,并触发执行(最终会通过rpc调用到执行器部分执行)
for (int jobId: ringItemData) {
// do trigger
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
}
// 清除该队列中的数据
ringItemData.clear();
}
} catch (Exception e) {
if (!ringThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
}
}
// 休眠,休眠时间对齐到秒,意思就是比如当前时间是400ms,
// 然后通过1000 - System.currentTimeMillis()%1000计算后算出来是600,所以就休眠
// 600ms,主要是为了实现时间轮1s前进一次的效果。
try {
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
}
});
xxl-job中的时间轮算法
可以看到xxl-job中的时间轮算法也是比较有特点的,每次轮询,它的生产数据端scheduleThread去数据库中读取未来5s内的任务列表,然后把它映射到对应的时间轮中,供后续的ringThread去消费。这个未来5s内应该是xxl-job开发者是有自己的考量,最终定的一个值。
但是注意读取未来的时间间隔不能超过60s,如果超过60s,就会出问题了,可能会存在提前60s去执行某个任务,导致定时任务提前执行了,这是因为时间轮的刻度是60s。
xxl-job的这个时间轮算法最终是可以支持任意的定时时间。
总结
时间轮算法可以说是时间定时任务的最佳解决方案了,它也是在很多开源项目中有应用,比如Netty、Akka、Quartz、ZooKeeper 、Kafka等组件中都存在时间轮的踪影。
本文暂时没有评论,来添加一个吧(●'◡'●)