探尋使用者自定義定時任務的實踐方案
導讀
工作中會遇到一些由使用者自定義定時任務的業務場景,常用的開源框架(如 XXL-Job、Quartz)設計的初衷是給開發人員使用,並不適合開放給使用者建立大量的自定義任務。本文借鑑開源框架定時任務作業的思想,結合 j.u.c 的 ScheduledExecutor,提供一種定時任務的實現方法,以解決使用者自定義定時任務場景的問題。希望對大家有所幫助。
作者:楊凱 | 網易智企資深開發工程師
使用者自定義定時任務
談到定時任務的實現,我們優先想到的是引入優秀的開源框架方案去解決,常見的開源產品上文也提到過,如Quartz、XXL-Job、ElasticJob 等,但是開源框架應用到使用者自定義任務上,存在以下需要問題或不足:
- 開源框架從任務建立到執行有一套標準方案,使用者自定義任務在何時,何地插入符合開源框架標準任務並能控制生效、停止是一個需要考慮的複雜問題。
- 開源框架(如 XXL-Job)對任務的管理和業務容器是解耦的,如果使用者要完成任務的建立、修改需要業務服務反向呼叫操作任務中心,這不符合任務中心設計原則。
- 開源框架設計的初衷是給程式開發者建立和控制任務。一般情況下,任務執行的策略、目的都比較明確,不像使用者自定義任務存在頻繁修改和相同業務背景多個任務定義使用同一個處理邏輯。
- 開源框架未提供使用者友好的任務配置介面。
設計使用者自定義任務元件除了要考慮上面的問題,還需要站在使用者角度思考使用者自定義任務的特點:
- 開始和結束可控
使用者自定義定時任務業務依賴性強,可以多次建立和更新任務,但不會執行,也會在任務執行期間人為停止。所以任務元件要將業務任務建立和作業任務的建立區分,只建立、載入使用者確定執行的任務。
- 執行策略和執行時間對使用者友好
程式開發者建立定時任務,執行策略(單個任務迴圈、單次)和執行時間是由配置的 Cron 表示式確定,但是 Cron 表示式對使用者不友好,容易配置出錯。使用者自定義定時任務在設定定時策略和執行時間時,需要提供使用者友好的配置介面,任務元件內部轉換成對應的 Cron 表示式。
- 執行時間範圍可控
完成一、二步的配置後,需要給使用者提供一個任務執行的時間範圍,在這個時間範圍內才會執行任務。 簡單的使用者自定義定時任務的介面如下:
清楚了使用者自定義定時任務的特點,定義任務模型 TaskScheduleDefine 為:
屬性 | 註釋 |
---|---|
id | 任務的唯一標識 |
busId | 業務維度的 ID:可以根據業務背景決定是唯一還是指定 |
taskName | 任務名稱 |
beanName | 任務處理類例項名稱 |
cron | cron 表示式 |
startTime | 使用者定義的開始時間 |
endTime | 使用者定義的結束時間 |
isPermanent | 是否永久任務 |
multiple | 是否允許同一時間任務任務並行執行 |
once | 是否單次任務 |
valid | 任務是否有效 |
定時任務執行週期
定時任務從建立到執行可分為如下階段:
- 建立:介面化的配置(如 XXL-Job),程式碼配置(如 Quartz,spring-schedule)。
- 載入:任務載入到應用快取,可以在建立時進行,但實際上任務建立和載入任務是分開的,比如當任務被修改時,實際上是有一個更新的過程的,可以把這種更新叫做任務的過載。
- 排程:判斷被載入的任務是否滿足執行條件(如果支援分散式排程要決定那一臺伺服器去執行),如果滿足,開始執行。
- 執行:開源框架都會完成上面的三個步驟(排程中心或應用本身),業務開發者只用關注業務邏輯部分,做到任務排程和業務執行解耦。
本文介紹的任務元件也是基於這個思想去實現使用者自定義任務。
使用者自定義任務設計
應用啟動時,初始化任務載入執行緒和任務排程執行緒(類似於 XXL-Job 的 scheduleThread 和 ringThread)
//上傳+載入,支援本地和資料庫任務
uploadAndLoadDefinition();
//初始化排程, 排程由維護任務來處理,由排程任務來喚起相應的具體執行
internalScheduledExecutor.scheduleAtFixedRate(new SpringTaskMonitor(), 10, 45, TimeUnit.SECONDS);
//定義維護
internalScheduledExecutor.scheduleAtFixedRate(new SpringTaskDefinitionMonitor(), 1, 2, TimeUnit.MINUTES);
任務建立
將業務任務執行和停止與作業任務建立和失效關聯,達到使用者自定義定時任務的初衷,作業任務完全由使用者決定。
任務載入
任務載入使用 j.u.c 提供的定時任務執行緒池 ScheduledThreadPoolExecutor 的 scheduleAtFixedRate 方法,週期性的觸發任務的載入,保證快取中任務的及時更新。不同的是使用者自定義任務一般都是提前建立好的,不需要不間斷的去查詢,而且可以通過開始和結束時間雙重保證任務正確觸發。
註冊任務部分邏輯:
//獲取全部任務列表defineList更新任務
defineList.forEach(t -> {
String key = t.getBeanName() + t.getBusId();
val task = TaskDefinitions.registered(key);
//沒有(並且有效),就新增
if (task == null) {
if (t.getValid()) {
TaskDefinitions.registerTask(new ScheduleTask(t));
changedList.add(t);
}
}
//有,就替換定義
else {
boolean changed = task.updateDefine(t);
if (changed) {
changedList.add(t);
}
}
});
//列印變化的任務日誌
}
//ScheduleTask任務定義,updateDefine這個物件的屬性
public class ScheduleTask {
private long id;
private TaskScheduleDefine localScheduleDefine;
private CronSequenceGenerator cronGenerator;
public ScheduleTask(TaskScheduleDefine taskScheduleDefine) {
this.id = taskScheduleDefine.getId();
this.localScheduleDefine = taskScheduleDefine;
}
}
任務排程
排程任務的部分邏輯:
public class SpringTaskMonitor implements Runnable {
private static Date DATE_INIT = new Date();
@Override
public void run() {
ExceptionUtils.doActionLogE(this::doRun);
}
private void doRun() throws Throwable {
val taskScheduleDefineMapper = ApplicationContextUtils.getReadyApplicationContext().getBean(TaskScheduleDefineMapper.class);
val taskScheduleRecordMapper = ApplicationContextUtils.getReadyApplicationContext().getBean(TaskScheduleRecordMapper.class);
TaskDefinitions.getTaskMap().values().forEach(t -> {
//1.無效任務
if (!t.getLocalScheduleDefine().getValid()) {
return;
}
//2.設定了過期時間
Date now = new Date();
if (!t.getLocalScheduleDefine().getIsPermanent()) {
Date endTime = t.getLocalScheduleDefine().getEndTime();
if (null == endTime || endTime.before(now)) {
TaskDefinitions.getTaskMap().remove(t.getLocalScheduleDefine().getBeanName() + t.getLocalScheduleDefine().getBusId());
taskScheduleDefineMapper.updateTaskValid(t.getLocalScheduleDefine().getId(), false);
return;
}
}
val lastRecord = taskScheduleRecordMapper.getLast(t.getLocalScheduleDefine().getId());
Date date = lastRecord == null ? DATE_INIT : lastRecord.getExecuteDate();
boolean shouldRun = false;
Date nextDate = t.cronGenerator().next(date);
//首次執行且執行時間未到重置開始時間
if (null != t.getLocalScheduleDefine().getStartTime() && nextDate.before(t.getLocalScheduleDefine().getStartTime())) {
DATE_INIT = new Date();
log.warn("任務執行時間未到設定的開始時間,重新設定系統時間{},本次任務忽略:{}", DateUtil.formatDate(DATE_INIT, "yyyy-MM-dd HH:mm:ss"), GsonUtil.toJson(t));
return;
}
if (DateUtils.addSeconds(nextDate, 30).before(now)) {
shouldRun = true;
}
if (shouldRun) {
TaskWork localWork = (TaskWork) ApplicationContextUtils.getReadyApplicationContext().getBean(t.getLocalScheduleDefine().getBeanName());
SpringTaskExecutor.getExecutorService().submit(() -> localWork.runJob(t));
}
});
}
}
上述流程較清晰的還原了任務排程的一些主要邏輯。從任務排程的部分程式碼中可以看出,整個排程過程異常被捕獲,出現異常不會影響下一次的排程執行,任務的 misfire 問題處理策略是:
- 任務過了使用者的設定時間不執行
- 任務未到使用者的設定時間不執行
- 任務首次執行出了異常(以資料庫執行記錄為準),以當前時間為觸發頻率立刻觸發一次執行,然後按照 Cron 頻率依次執行(類似類似於 Quartz 的預設 withMisfireHandlingInstructionFireAndProceed 模式)
- 定時任務已有執行記錄,以錯過的第一個頻率時間立刻開始執行,重做錯過的所有頻率週期後,重當下一次觸發頻率發生時間大於當前時間後,再按照正常的 Cron 頻率依次執行(類似於 Quartz的withMisfireHandlingInstructionIgnoreMisfires 模式)
另外,需要考慮的是在同一個業務場景下,使用者會建立多個任務定義,但它們執行的業務邏輯是一樣的(執行策略,執行時間等不一樣)。
任務執行
任務排程提交的任務給執行緒池處理,執行前後根據任務定義對任務做一些通用處理(黃色框部分),具體的執行業務邏輯交給介面 LocalWork 實現類的 execute() 方法處理。
/**
* description: 輔助來完成預設的localWork方法
*/
public class TaskWorkUtils {
static void helpRun(TaskWork localWork, ScheduleTask scheduleTask) {
//部分虛擬碼如下
}
}
//是否任務有執行過
boolean executed = false;
TaskScheduleRecord record = null;
Date executeDate = new Date();
try {
//根據需要決定是否獲取鎖後執行(redisLock,zkLock,dbLock都可以,保證任務唯一執行)
String lockName = localWork.getClass().getSimpleName() + scheduleTask.getLocalScheduleDefine().getBusId();
//獲取不到鎖return
//獲取到執行下面邏輯
record = ExceptionUtils.doFunLogE(() -> {
TaskScheduleRecord newRecord = buildRecord(scheduleTask, executeDate);
newRecord.setId(taskRecordService.save(newRecord));
return newRecord;
});
//如果不能儲存成功,表示出現了資料庫異常,相應狀態不能存取,則直接返回,不再執行
if (record == null) {
return;
}
executed = true;
localWork.execute(record);
} catch (Throwable throwable) {
log.error("執行任務時出現異常資訊:{}", throwable.getMessage(), throwable);
e = throwable;
} finally {
//釋放鎖:releaseLock()
//記錄異常日誌,更新任務狀態和失敗原因
if (record != null) {
}
}
if (!scheduleTask.getLocalScheduleDefine().getOnce()&&executed) {
Date next = scheduleTask.cronGenerator().next(executeDate);
long delay = next.getTime() - executeDate.getTime();
SpringTaskExecutor.getExecutorService().schedule(() -> localWork.runJob(scheduleTask), delay, TimeUnit.MILLISECONDS);
}
}
如果要保證任務在叢集中保證唯一執行可通過分散式鎖實現,具體的key已給參考,因為沒有提供叢集節點註冊的功能,負載均衡的排程只能依賴叢集中節點獲取鎖的隨機性,即那個節點獲取到鎖,任務在哪個節點執行。
當任務執行出錯時(儲存完執行記錄後),不影響下一次任務的執行,但會更新此次任務執行的結果和失敗原因。
任務設計小結
應用啟動時,初始化任務,開啟任務載入執行緒,開啟任務排程執行緒。任務載入執行緒週期性的從 DB 中獲取全部任務,並更新快取中任務例項;排程執行緒負責對任務定義例項進行一系列的判斷,決定是否交給執行執行緒池去執行,任務載入和呼叫可以使用一個定時執行緒池。
private ScheduledExecutorService internalScheduledExecutor = new ScheduledThreadPoolExecutor(2,
new ThreadFactoryBuilder().setNameFormat("task-internal-%d").build());
執行任務的執行緒池接收到提交的任務,執行前後做統一處理,任務執行的具體業務邏輯交給具體的實現類去做。整個處理流程中,需要兩張表(任務定義表+任務執行記錄表),2 個定時執行緒池可完成。
總結
本文基於使用者自定義定時任務的特點,從任務建立、任務載入、任務排程、任務執行四個方面詳細的介紹了任務執行的過程,對定時任務中常見的問題和處理過程附帶了部分程式碼供參考,在支援一般定時任務的同時給大家提供了一種使用者自定義定時任務的實踐方法。
- 資訊|WebRTC M93 更新
- 技術實踐|網易雲信 IM SDK 服務高可用技術方案
- 久等了!【Innovation 2021】網易應用創新開發者大賽正式開賽!
- 用程式碼,打造創意新世界!【Innovation 2021】網易應用創新開發者大賽正式開賽!
- Web端實現RTC視訊特效的解決方案
- 從0搭建線上聊天室,只需4步!
- JavaScript 之事件迴圈(Event Loop)
- C 20 四大特性之一:Module 特性詳解
- Android Flutter 多例項實踐
- 技術實踐 | 網易雲信視訊轉碼提速之分片轉碼
- 網易雲信線上萬人連麥技術大揭祕
- 視訊 QoE 的平衡之道—揭祕網易雲信 NERTC 視訊質量控制系統
- 使用 VideoToolbox 探索低延遲視訊編碼 | WWDC 演講實錄
- 技術實踐 | 如何基於 Flink 實現通用的聚合指標計算框架
- 網易雲信大規模聊天室系統架構解析
- 探尋使用者自定義定時任務的實踐方案
- 如何在 Electron 上實現 IM SDK 聊天訊息全文檢索
- 架構解析|網易自研新一代大規模分散式傳輸網
- 直播點播窄帶高清之 JND 感知編碼技術
- Flutter 混合開發基礎