[Abp 源码分析]后台作业与后台工作者
點擊上方藍字關注我們
0. 簡介
在某些時候我們可能會需要執行后臺任務,或者是執行一些周期性的任務。比如說可能每隔 1 個小時要清除某個臨時文件夾內的數據,可能用戶會要針對某一個用戶群來群發一組短信。前面這些就是典型的應用場景,在 Abp 框架里面為我們準備了后臺作業和后臺工作者來幫助我們解決這個問題。
后臺作業與后臺工作者的區別是,前者主要用于某些耗時較長的任務,而不想阻塞用戶的時候所使用。后者主要用于周期性的執行某些任務,從 “工作者” 的名字可以看出來,就是一個個工人,而且他們每個工人都擁有單獨的后臺線程。
0.1 典型場景
后臺作業
某個用戶按下了報表按鈕來生成一個需要長時間等待的報表。你添加這個工作到隊列中,當報表生成完畢后,發送報表結果到該用戶的郵箱。
在后臺作業中發送一封郵件,有些問題可能會導致發送失敗(網絡連接異常,或者主機宕機);由于有后臺作業以及持久化機制,在問題排除后,可以重試以保證任務的成功執行。
后臺工作者
后臺工作者能夠周期性地執行舊日志的刪除。
后臺工作者可以周期性地篩選出非活躍性用戶,并且發送回歸郵件給這些用戶。
1. 啟動流程
后臺作業與后臺工作者都是通過各自的 Manager(IBackgroundJobManager/IBackgroundWorkerManager) 來進行管理的。而這兩個 Manager 分別繼承了?ISingletonDependency?接口,所以在啟動的時候就會自動注入這兩個管理器以便開發人員管理操作。
這里值得注意的一點是,IBackgroundJobManager?接口是?IBackgroundWorker?的派生接口,而?IBackgroudWorker?是歸屬于?IBackgroundWorkerManager?進行管理的。
所以,你可以在?AbpKernelModule?里面看到如下代碼:
public sealed class AbpKernelModule : AbpModule {public override void PostInitialize(){// 注冊可能缺少的組件RegisterMissingComponents();// ... 忽略的代碼// 各種管理器的初始化操作// 從配置項中讀取,是否啟用了后臺作業功能if (Configuration.BackgroundJobs.IsJobExecutionEnabled){var workerManager = IocManager.Resolve<IBackgroundWorkerManager>();// 開始啟動后臺工作者workerManager.Start();// 增加后臺作業管理器workerManager.Add(IocManager.Resolve<IBackgroundJobManager>());}} }可以看到,后臺作業管理器是作為一個后臺工作者被添加到了?IBackgroundWorkerManager?當中來執行的。
2. 代碼分析
2.1 后臺工作者
2.1.1 后臺工作者管理器
Abp 通過后臺工作者管理器來管理后臺作業隊列,所以我們首先來看一下后臺工作者管理器接口的定義是什么樣子的。
public interface IBackgroundWorkerManager : IRunnable {void Add(IBackgroundWorker worker); }還是相當簡潔的,就一個?Add?方法用來添加一個新的后臺工作者對象。只是在這個地方,可以看到該接口又是集成自?IRunnable?接口,那么該接口的作用又是什么呢?
轉到其定義可以看到,IRunable?接口定義了三個基本的方法:Start()、Stop()、WaitStop()?,而且他擁有一個默認實現?RunableBase,其實就是用來標識一個任務的運行狀態。
public interface IRunnable {// 開始執行任務void Start();// 停止執行任務void Stop();// 阻塞線程,等待任務執行完成后標識為停止。void WaitToStop(); }public abstract class RunnableBase : IRunnable {// 用于標識任務是否運行的布爾值變量public bool IsRunning { get { return _isRunning; } }private volatile bool _isRunning;// 啟動之后表示任務正在運行public virtual void Start(){_isRunning = true;}// 停止之后表示任務結束運行public virtual void Stop(){_isRunning = false;}public virtual void WaitToStop(){} }到目前為止整個代碼都還是比較簡單清晰的,我們接著看?IBackgroundWorkerManager?的默認實現?BackgroundWorkerManager?類,首先我們看一下該類擁有哪些屬性與字段。
public class BackgroundWorkerManager : RunnableBase, IBackgroundWorkerManager, ISingletonDependency, IDisposable {private readonly IIocResolver _iocResolver;private readonly List<IBackgroundWorker> _backgroundJobs;public BackgroundWorkerManager(IIocResolver iocResolver){_iocResolver = iocResolver;_backgroundJobs = new List<IBackgroundWorker>();} }在后臺工作者管理器類的內部,默認有一個 List 集合,用于維護所有的后臺工作者對象。那么其他的?Start()?等方法肯定是基于這個集合進行操作的。
public override void Start() {base.Start();_backgroundJobs.ForEach(job => job.Start()); }public override void Stop() {_backgroundJobs.ForEach(job => job.Stop());base.Stop(); }public override void WaitToStop() {_backgroundJobs.ForEach(job => job.WaitToStop());base.WaitToStop(); }可以看到實現還是比較簡單的,接下來我們繼續看他的?Add()?方法是如何進行操作的?
public void Add(IBackgroundWorker worker) {_backgroundJobs.Add(worker);if (IsRunning){worker.Start();} }在這里我們看到他會針對?IsRunning?進行判定是否立即啟動加入的后臺工作者對象。而這個?IsRunning?屬性值唯一產生變化的情況就在于?Start()?方法與?Stop()?方法的調用。
最后肯定也有相關的銷毀方法,用于釋放所有注入的后臺工作者對象,并將集合清除。
private bool _isDisposed;public void Dispose() {if (_isDisposed){return;}_isDisposed = true;// 遍歷集合,通過 Ioc 解析器的 Release 方法釋放對象_backgroundJobs.ForEach(_iocResolver.Release);// 清空集合_backgroundJobs.Clear(); }所以,針對于所有后臺工作者的管理,都是通過?IBackgroundWorkerManager?來進行操作的。
2.1.2 后臺工作者
看完了管理器,我們來看一下?IBackgroundWorker?后臺工作者對象是怎樣的構成。
public interface IBackgroundWorker : IRunnable {}貌似只是一個空的接口,其作用主要是標識某個類型是否為后臺工作者,轉到其抽象類實現?BackgroundWorkerBase,里面只是注入了一些輔助對象與本地化的一些方法。
public abstract class BackgroundWorkerBase : RunnableBase, IBackgroundWorker {// 配置管理器public ISettingManager SettingManager { protected get; set; }// 工作單元管理器public IUnitOfWorkManager UnitOfWorkManager{get{if (_unitOfWorkManager == null){throw new AbpException("Must set UnitOfWorkManager before use it.");}return _unitOfWorkManager;}set { _unitOfWorkManager = value; }}private IUnitOfWorkManager _unitOfWorkManager;// 獲得當前的工作單元protected IActiveUnitOfWork CurrentUnitOfWork { get { return UnitOfWorkManager.Current; } }// 本地化資源管理器public ILocalizationManager LocalizationManager { protected get; set; }// 默認的本地化資源的源名稱protected string LocalizationSourceName { get; set; }protected ILocalizationSource LocalizationSource{get{// 如果沒有配置源名稱,直接拋出異常if (LocalizationSourceName == null){throw new AbpException("Must set LocalizationSourceName before, in order to get LocalizationSource");}if (_localizationSource == null || _localizationSource.Name != LocalizationSourceName){_localizationSource = LocalizationManager.GetSource(LocalizationSourceName);}return _localizationSource;}}private ILocalizationSource _localizationSource;// 日志記錄器public ILogger Logger { protected get; set; }protected BackgroundWorkerBase(){Logger = NullLogger.Instance;LocalizationManager = NullLocalizationManager.Instance;}// ... 其他模板代碼 }我們接著看繼承并實現了?BackgroundWorkerBase?的類型?PeriodicBackgroundWorkerBase,從字面意思上來看,該類型應該是一個定時后臺工作者基類。
重點在于?Periodic(定時),從其類型內部的定義可以看到,該類型使用了一個?AbpTimer?對象來進行周期計時與具體工作任務的觸發。我們暫時先不看這個?AbpTimer,僅僅看?PeriodicBackgroundWorkerBase?的內部實現。
public abstract class PeriodicBackgroundWorkerBase : BackgroundWorkerBase {protected readonly AbpTimer Timer;// 注入 AbpTimerprotected PeriodicBackgroundWorkerBase(AbpTimer timer){Timer = timer;// 綁定周期執行的任務,這里是 DoWork()Timer.Elapsed += Timer_Elapsed;}public override void Start(){base.Start();Timer.Start();}public override void Stop(){Timer.Stop();base.Stop();}public override void WaitToStop(){Timer.WaitToStop();base.WaitToStop();}private void Timer_Elapsed(object sender, System.EventArgs e){try{DoWork();}catch (Exception ex){Logger.Warn(ex.ToString(), ex);}}protected abstract void DoWork(); }可以看到,這里基類綁定了?DoWork()?作為其定時執行的方法,那么用戶在使用的時候直接繼承自該基類,然后重寫?DoWork()?方法即可綁定自己的后臺工作者的任務。
2.1.3 AbpTimer 定時器
在上面的基類我們看到,基類的?Start()、Stop()、WaitTpStop()?方法都是調用的?AbpTimer?所提供的,所以說?AbpTimer?其實也繼承了?RunableBase?基類并實現其具體的啟動與停止操作。
其實?AbpTimer?的核心就是通過 CLR 的?Timer?來實現周期性任務執行的,不過默認的?Timer?類有兩個比較大的問題。
CLR 的?Timer?并不會等待你的任務執行完再執行下一個周期的任務,如果你的某個任務耗時過長,超過了?Timer?定義的周期。那么?Timer?會開啟一個新的線程執行,這樣的話最后我們系統的資源會因為線程大量重復創建而被拖垮。
如何知道一個?Timer?所執行的業務方法已經真正地被結束了。
所以 Abp 才會重新封裝一個?AbpTimer?作為一個基礎的計時器。第一個問題的解決方法很簡單,就是在執行具體綁定的業務方法之前,通過?Timer.Change()?方法來讓?Timer?臨時失效。等待業務方法執行完成之后,再將?Timer?的周期置為用戶設定的周期。
// CLR Timer 綁定的回調方法 private void TimerCallBack(object state) {lock (_taskTimer){if (!_running || _performingTasks){return;}// 暫時讓 Timer 失效_taskTimer.Change(Timeout.Infinite, Timeout.Infinite);// 設置執行標識為 TRUE,表示當前的 AbpTimer 正在執行_performingTasks = true;}try{// 如果綁定了相應的觸發事件if (Elapsed != null){// 執行相應的業務方法,這里就是最開始綁定的 DoWork() 方法Elapsed(this, new EventArgs());}}catch{}finally{lock (_taskTimer){// 標識業務方法執行完成_performingTasks = false;if (_running){// 更改周期為用戶指定的執行周期,等待下一次觸發_taskTimer.Change(Period, Timeout.Infinite);}Monitor.Pulse(_taskTimer);}} }針對于第二個問題,Abp 通過?WaitToStop()?方法會阻塞調用這個?Timer?的線程,并且在?_performingTasks?標識位是?false?的時候釋放。
public override void WaitToStop() {// 鎖定 CLR 的 Timer 對象lock (_taskTimer){// 循環檢測while (_performingTasks){Monitor.Wait(_taskTimer);}}base.WaitToStop(); }至于其他的?Start()?方法就是使用 CLR 的?Timer?更改其執行周期,而?Stop()?就是直接將?Timer?的周期設置為無限大,使計時器失效。
2.1.4 總結
Abp 后臺工作者的核心就是通過?AbpTimer?來實現周期性任務的執行,用戶只需要繼承自?PeriodicBackgroundWorkerBase,然后將其添加到?IBackgroundWorkerManager?的集合當中。這樣 Abp 在啟動之后就會遍歷這個工作者集合,然后周期執行這些后臺工作者綁定的方法。
當然如果你繼承了?PeriodicBackgroundWorkerBase?之后,可以通過設置構造函數的?AbpTimer?來指定自己的執行周期。
2.2 后臺作業隊列
后臺工作隊列的管理是通過?IBackgroundJobManager?來處理的,而該接口又繼承自?IBackgroundWorker,所以一整個后臺作業隊列就是一個后臺工作者,只不過這個工作者有點特殊。
2.2.1 后臺作業管理器
IBackgroundJobManager?接口的定義其實就兩個方法,一個?EnqueueAsync<TJob, TArgs>()?用于將一個后臺作業加入到執行隊列當中。而?DeleteAsync()?方法呢,顧名思義就是從隊列當中移除指定的后臺作業。
首先看一下其默認實現?BackgroundJobManager,該實現同樣是繼承自?PeriodicBackgroundWorkerBase?并且其默認周期為 5000 ms。
public class BackgroundJobManager : PeriodicBackgroundWorkerBase, IBackgroundJobManager, ISingletonDependency {// 事件總線public IEventBus EventBus { get; set; }// 輪訓后臺作業的間隔,默認值為 5000 毫秒.public static int JobPollPeriod { get; set; }// IOC 解析器private readonly IIocResolver _iocResolver;// 后臺作業隊列存儲private readonly IBackgroundJobStore _store;static BackgroundJobManager(){JobPollPeriod = 5000;}public BackgroundJobManager(IIocResolver iocResolver,IBackgroundJobStore store,AbpTimer timer): base(timer){_store = store;_iocResolver = iocResolver;EventBus = NullEventBus.Instance;Timer.Period = JobPollPeriod;} }基礎結構基本上就這個樣子,接下來看一下他的兩個接口方法是如何實現的。
EnqueueAsync<TJob, TArgs>?方法通過傳入指定的后臺作業對象和相應的參數,同時還有任務的優先級。將其通過?IBackgroundJobStore?進行持久化,并返回一個任務的唯一 JobId 以便進行刪除操作。
public async Task<string> EnqueueAsync<TJob, TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null)where TJob : IBackgroundJob<TArgs> {// 通過 JobInfo 包裝任務的基本信息var jobInfo = new BackgroundJobInfo{JobType = typeof(TJob).AssemblyQualifiedName,JobArgs = args.ToJsonString(),Priority = priority};// 如果需要延時執行的話,則用當前時間加上延時的時間作為任務下次運行的時間if (delay.HasValue){jobInfo.NextTryTime = Clock.Now.Add(delay.Value);}// 通過 Store 進行持久話存儲await _store.InsertAsync(jobInfo);// 返回后臺任務的唯一標識return jobInfo.Id.ToString(); }至于刪除操作,在 Manager 內部其實也是通過?IBackgroundJobStore?進行實際的刪除操作的。
public async Task<bool> DeleteAsync(string jobId) {// 判斷 jobId 的值是否有效if (long.TryParse(jobId, out long finalJobId) == false){throw new ArgumentException($"The jobId '{jobId}' should be a number.", nameof(jobId));}// 使用 jobId 從 Store 處篩選到 JobInfo 對象的信息BackgroundJobInfo jobInfo = await _store.GetAsync(finalJobId);if (jobInfo == null){return false;}// 如果存在有 JobInfo 則使用 Store 進行刪除操作await _store.DeleteAsync(jobInfo);return true; }后臺作業管理器實質上是一個周期性執行的后臺工作者,那么我們的后臺作業是每 5000 ms 執行一次,那么他的?DoWork()?方法又在執行什么操作呢?
protected override void DoWork() {// 從 Store 當中獲得等待執行的后臺作業集合var waitingJobs = AsyncHelper.RunSync(() => _store.GetWaitingJobsAsync(1000));// 遍歷這些等待執行的后臺任務,然后通過 TryProcessJob 進行執行foreach (var job in waitingJobs){TryProcessJob(job);} }可以看到每 5 秒鐘我們的后臺作業管理器就會從?IBackgroundJobStore?當中拿到最大 1000 條的后臺作業信息,然后遍歷這些信息。通過?TryProcessJob(job)?方法來執行后臺作業。
而?TryProcessJob()?方法,本質上就是通過反射構建出一個?IBackgroundJob?對象,然后取得序列化的參數值,通過反射得到的?MethodInfo?對象來執行我們的后臺任務。執行完成之后,就會從 Store 當中移除掉執行完成的任務。
針對于在執行過程當中所出現的異常,會通過?IEventBus?觸發一個?AbpHandledExceptionData?事件記錄后臺作業執行失敗時的異常信息。并且一旦在執行過程當中出現了任何異常的情況,都會將該任務的?IsAbandoned?字段置為?true,當該字段為?true?時,該任務將不再回被執行。
PS:就是在?GetWaitingJobsAsync()?方法時,會過濾掉 IsAbandoned 值為?true?的任務。
private void TryProcessJob(BackgroundJobInfo jobInfo) {try{// 任務執行次數自增 1jobInfo.TryCount++;// 最后一次執行時間設置為當前時間jobInfo.LastTryTime = Clock.Now;// 通過反射取得后臺作業的類型var jobType = Type.GetType(jobInfo.JobType);// 通過 Ioc 解析器得到一個臨時的后臺作業對象,執行完之后既被釋放using (var job = _iocResolver.ResolveAsDisposable(jobType)){try{// 通過反射得到后臺作業的 Execute 方法var jobExecuteMethod = job.Object.GetType().GetTypeInfo().GetMethod("Execute");var argsType = jobExecuteMethod.GetParameters()[0].ParameterType;var argsObj = JsonConvert.DeserializeObject(jobInfo.JobArgs, argsType);// 結合持久話存儲的參數信息,調用 Execute 方法進行后臺作業jobExecuteMethod.Invoke(job.Object, new[] { argsObj });// 執行完成之后從 Store 刪除該任務的信息AsyncHelper.RunSync(() => _store.DeleteAsync(jobInfo));}catch (Exception ex){Logger.Warn(ex.Message, ex);// 計算下一次執行的時間,一旦超過 2 天該任務都執行失敗,則返回 nullvar nextTryTime = jobInfo.CalculateNextTryTime();if (nextTryTime.HasValue){jobInfo.NextTryTime = nextTryTime.Value;}else{// 如果為 null 則說明該任務在 2 天的時間內都沒有執行成功,則放棄繼續執行jobInfo.IsAbandoned = true;}// 更新 Store 存儲的任務信息TryUpdate(jobInfo);// 觸發異常事件EventBus.Trigger(this,new AbpHandledExceptionData(new BackgroundJobException("A background job execution is failed. See inner exception for details. See BackgroundJob property to get information on the background job.", ex){BackgroundJob = jobInfo,JobObject = job.Object}));}}}catch (Exception ex){Logger.Warn(ex.ToString(), ex);// 表示任務不再執行jobInfo.IsAbandoned = true;// 更新 StoreTryUpdate(jobInfo);} }2.2.2 后臺作業
后臺作業的默認接口定義為?IBackgroundJob<in TArgs>?,他只有一個?Execute(TArgs args)?方法,用于接收指定類型的作業參數,并執行。
一般來說我們不建議直接通過繼承?IBackgroundJob<in TArgs>?來實現后臺作業,而是繼承自?BackgroundJob<TArgs>?抽象類。該抽象類內部也沒有什么特別的實現,主要是注入了一些基礎設施,比如說 UOW 與 本地化資源管理器,方便我們開發使用。
后臺作業本身是具體執行的對象,而?BackgroundJobInfo?則是存儲了后臺作業的 Type 類型和參數,方便在需要執行的時候通過反射的方式執行后臺作業。
2.2.2 后臺作業隊列存儲
從?IBackgroundJobStore?我們就可以猜到以 Abp 框架的套路,他肯定會有兩種實現,第一種就是基于內存的?InMemoryBackgroundJobStore。而第二種呢,就是由 Abp.Zero 模塊所提供的基于數據庫的?BackgroundJobStore。
IBackgroundJobStore?接口所定義的方法基本上就是增刪改查,沒有什么復雜的。
public interface IBackgroundJobStore {// 通過 JobId 獲取后臺任務信息Task<BackgroundJobInfo> GetAsync(long jobId);// 插入一個新的后臺任務信息Task InsertAsync(BackgroundJobInfo jobInfo);/// <summary>/// Gets waiting jobs. It should get jobs based on these:/// Conditions: !IsAbandoned And NextTryTime <= Clock.Now./// Order by: Priority DESC, TryCount ASC, NextTryTime ASC./// Maximum result: <paramref name="maxResultCount"/>./// </summary>/// <param name="maxResultCount">Maximum result count.</param>Task<List<BackgroundJobInfo>> GetWaitingJobsAsync(int maxResultCount);/// <summary>/// Deletes a job./// </summary>/// <param name="jobInfo">Job information.</param>Task DeleteAsync(BackgroundJobInfo jobInfo);/// <summary>/// Updates a job./// </summary>/// <param name="jobInfo">Job information.</param>Task UpdateAsync(BackgroundJobInfo jobInfo); }這里先從簡單的內存 Store 說起,這個?InMemoryBackgroundJobStore?內部使用了一個并行字典來存儲這些任務信息。
public class InMemoryBackgroundJobStore : IBackgroundJobStore {private readonly ConcurrentDictionary<long, BackgroundJobInfo> _jobs;private long _lastId;public InMemoryBackgroundJobStore(){_jobs = new ConcurrentDictionary<long, BackgroundJobInfo>();} }相當簡單,這幾個接口方法基本上就是針對與這個并行字典操作的一層封裝。
public Task<BackgroundJobInfo> GetAsync(long jobId) {return Task.FromResult(_jobs[jobId]); }public Task InsertAsync(BackgroundJobInfo jobInfo) {jobInfo.Id = Interlocked.Increment(ref _lastId);_jobs[jobInfo.Id] = jobInfo;return Task.FromResult(0); }public Task<List<BackgroundJobInfo>> GetWaitingJobsAsync(int maxResultCount) {var waitingJobs = _jobs.Values// 首先篩選出不再執行的后臺任務.Where(t => !t.IsAbandoned && t.NextTryTime <= Clock.Now)// 第一次根據后臺作業的優先級進行排序,高優先級優先執行.OrderByDescending(t => t.Priority)// 再根據執行次數排序,執行次數越少的,越靠前.ThenBy(t => t.TryCount).ThenBy(t => t.NextTryTime).Take(maxResultCount).ToList();return Task.FromResult(waitingJobs); }public Task DeleteAsync(BackgroundJobInfo jobInfo) {_jobs.TryRemove(jobInfo.Id, out _);return Task.FromResult(0); }public Task UpdateAsync(BackgroundJobInfo jobInfo) {// 如果是不再執行的任務,刪除if (jobInfo.IsAbandoned){return DeleteAsync(jobInfo);}return Task.FromResult(0); }至于持久化到數據庫,無非是注入一個倉儲,然后針對這個倉儲進行增刪查改的操作罷了,這里就不在贅述。
2.2.3 后臺作業優先級
后臺作業的優先級定義在?BackgroundJobPriority?枚舉當中,一共有 5 個等級,分別是?Low、BelowNormal、Normal、AboveNormal、High?,他們從最低到最高排列。
作者:myzony
出處:https://www.cnblogs.com/myzony/p/9841601.html
掃描二維碼
獲取更多精彩
碼俠江湖
喜歡就點個在看再走吧
總結
以上是生活随笔為你收集整理的[Abp 源码分析]后台作业与后台工作者的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [Abp 源码分析]ASP.NET Co
- 下一篇: Dapr 已在塔架就位 将发射新一代微服