javascript
Spring batch Job define
?http://www.ibm.com/developerworks/cn/java/j-lo-springbatch1/
總述
本系列文章旨在通過示例搭建以及特性介紹,詳細講述如何利用 Spring Batch 開發企業批處理應用。本系列文章共分為三部分,第一部分初步介紹了批處理以及 Spring Batch 的相關概念,同時搭建了一個簡單的基于 Spring Batch 的批處理應用。第二部分介紹了 Step Flow 以及并發支持。第三部分則主要介紹了 Spring Batch 對任務監控的支持。下面讓我們進入第一部分內容。
什么是批處理
在現代企業應用當中,面對復雜的業務以及海量的數據,除了通過龐雜的人機交互界面進行各種處理外,還有一類工作,不需要人工干預,只需要定期讀入大批量數據,然后完成相應業務處理并進行歸檔。這類工作即為“批處理”。
從上面的描述可以看出,批處理應用有如下幾個特點:
- 數據量大,少則百萬,多則上億的數量級。
- 不需要人工干預,由系統根據配置自動完成。
- 與時間相關,如每天執行一次或每月執行一次。
同時,批處理應用又明顯分為三個環節:
- 讀數據,數據可能來自文件、數據庫或消息隊列等
- 數據處理,如電信支撐系統的計費處理
- 寫數據,將輸出結果寫入文件、數據庫或消息隊列等
因此,從系統架構上,應重點考慮批處理應用的事務粒度、日志監控、執行、資源管理(尤其存在并發的情況下)。從系統設計上,應重點考慮數據讀寫與業務處理的解耦,提高復用性以及可測試性。
什么是 Spring Batch
Spring Batch 作為 Spring 的子項目,是一款基于 Spring 的企業批處理框架。通過它可以構建出健壯的企業批處理應用。Spring Batch 不僅提供了統一的讀寫接口、豐富的任務處理方式、靈活的事務管理及并發處理,同時還支持日志、監控、任務重啟與跳過等特性,大大簡化了批處理應用開發,將開發人員從復雜的任務配置管理過程中解放出來,使他們可以更多地去關注核心的業務處理過程。
另外我們還需要知道,Spring Batch 是一款批處理應用框架,不是調度框架。它只關注批處理任務相關的問題,如事務、并發、監控、執行等,并不提供相應的調度功能。因此,如果我們希望批處理任務定期執行,可結合 Quartz 等成熟的調度框架實現。
下面將通過一個示例詳細介紹如何使用 Spring Batch 搭建批處理應用。這個示例比較簡單,對系統中所有用戶發送一封繳費提醒通知。此處,我們簡單地將繳費提醒輸出到控制臺。當然,隨著介紹的深入,我將逐漸豐富該功能,使其最終完整展示 Spring Batch 的各種特性。
回頁首
環境搭建
首先,從 Spring 官方網站下載 Spring Batch 發布包(見 參考資源)。本文基于 Spring Batch 2.1.6(當前最新版本為 2.1.8)以及 Spring 2.5.6 版本構建。我們可以看到 Spring Batch 共包含 spring-batch-core 和 spring-batch-infrastructure 兩個包。spring-batch-core 主要包含批處理領域相關類,而 spring-batch-infrastructure 提供了一個基礎訪問處理框架。
接下來,讓我們新建一個 Eclipse 工程,并將 Spring Batch 以及 Spring 相關包添加到依賴環境,如 圖 1 所示
圖 1. 依賴環境
環境搭建完成后,讓我們看一下如何一步步構建一個批處理應用。
回頁首
構建應用
如“引言”中所述 Spring Batch 按照關注點的不同,將整個批處理過程分為三部分:讀、處理、寫,從而將批處理應用進行合理解耦。同時,Spring Batch 還針對讀、寫操作提供了多種實現,如消息、文件、數據庫。對于數據庫,還提供了 Hibernate、iBatis、JPA 等常見 ORM 框架的讀、寫接口支持。
對象定義
首先我們需要編寫用戶以及消息類,比較簡單,如清單 1 和 清單 2 所示:
清單 1. User 類
| package org.springframework.batch.sample;public class User {private String name;private Integer age;public String getName() {return name;}public void setName(String name) {this.name = name;}public Integer getAge() {return age;}public void setAge(Integer age) {this.age = age;} } |
清單 2. Message 類
| package org.springframework.batch.sample;public class Message {private String content;public String getContent() {return content;}public void setContent(String content) {this.content = content;} } |
讀寫及處理接口
首先,所有 Spring Batch 的讀操作均需要實現 ItemReader 接口,而且 Spring Batch 為我們提供了多種默認實現,尤其是基于 ORM 框架的讀接口,同時支持基于游標和分頁兩類操作。因此,多數情況下我們并不需要手動編寫 ItemReader 類,而是直接使用相應實現類即可。
在該示例中,我們使用 org.springframework.batch.item.file.FlatFileItemReader 類從文件中進行信息讀入,用戶信息格式定義如 清單 3 所示。
清單 3. 用戶信息
| User1,20 User2,21 User3,22 User4,23 User5,24 User6,25 User7,26 User8,27 User9,28 User10,29 |
該類封裝了文件讀操作,僅僅需要我們手動設置 LineMapper 與訪問文件路徑即可。Spring Batch 通過 LineMapper 可以將文件中的一行映射為一個對象。我們不難發現,Spring Batch 將文件操作封裝為類似 Spring JDBC 風格的接口,這也與 Spring 一貫倡導的接口統一是一致的。此處我們使用 org.springframework.batch.item.file.mapping.DefaultLineMapper 進行行映射。讀操作的配置信息如 清單 4 所示:
清單 4. message_job.xml
| <beans:bean id="messageReader" class="org.springframework.batch.item.file.FlatFileItemReader"><beans:property name="lineMapper" ref="lineMapper"></beans:property><beans:property name="resource" value="classpath:/users.txt"></beans:property> </beans:bean> <beans:bean id="lineMapper"class="org.springframework.batch.item.file.mapping.DefaultLineMapper"><beans:property name="lineTokenizer"><beans:bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"></beans:bean></beans:property><beans:property name="fieldSetMapper"><beans:bean class="org.springframework.batch.sample.UserMapper"></beans:bean></beans:property> </beans:bean> |
從清單我們可以知道,DefaultLineMapper 需要設置 lineTokenizer 和 fieldSetMapper 兩個屬性,首先通過 lineTokenizer 完成文件行拆分,并封裝為一個屬性結果集,因為我們使用“,”分隔用戶屬性,所以需要將 lineTokenizer 設置為 DelimitedLineTokenizer。最后通過 fieldSetMapper 完成將結果集封裝為一個 POJO 對象。具體實現如 清單 5 所示:
清單 5. UserMapper 類
| package org.springframework.batch.sample;import org.springframework.batch.item.file.mapping.FieldSetMapper; import org.springframework.batch.item.file.transform.FieldSet; import org.springframework.validation.BindException;public class UserMapper implements FieldSetMapper<User> {public User mapFieldSet(FieldSet fs) throws BindException {User u = new User();u.setName(fs.readString(0));u.setAge(fs.readInt(1));return u;} } |
該接口的實現方式與 Spring JDBC 的 RowMapper 極其相似。
接下來,再讓我們看一下如何實現寫操作。Spring Batch 所有寫操作均需要實現 ItemWriter 接口。該接口只有一個方法 void write(List<? extends T> items),參數是輸出結果的列表。之所以如此定義,是為了便于我們進行批量操作,以提高性能。每次傳入的列表由事務提交粒度確定,也就是說 Spring Batch 每次將提交的結果集傳入寫操作接口。因為我們要做的僅僅是將繳費通知輸出到控制臺,所以,寫操作實現如 清單 6 所示:
清單 6. MessagesItemWriter 類
| package org.springframework.batch.sample;import java.util.List; import org.springframework.batch.item.ItemWriter;public class MessagesItemWriter implements ItemWriter<Message>{public void write(List<? extends Message> messages) throws Exception {System.out.println("write results");for (Message m : messages) {System.out.println(m.getContent());}} } |
同 ItemReader 一樣,Spring Batch 也為我們提供了多樣的寫操作支持,具體可閱讀 Spring Batch 參考手冊,此處不再贅述。
最后,再看一下如何實現業務處理。Spring Batch 提供了 ItemProcessor 接口用于完成相應業務處理。在本示例中,即為根據用戶信息生成一條繳費通知信息,如 清單 7 所示:
清單 7. MessagesItemProcessor 類
| package org.springframework.batch.sample;import org.springframework.batch.item.ItemProcessor;public class MessagesItemProcessor implements ItemProcessor<User, Message> {public Message process(User user) throws Exception {Message m = new Message();m.setContent("Hello " + user.getName()+ ",please pay promptly at the end of this month.");return m;}} |
任務定義
通過上面一節,我們已經完成了批處理任務的讀數據、處理過程、寫數據三個過程。那么,我們如何將這三部分結合在一起完成批處理任務呢?
Spring Batch 將批處理任務稱為一個 Job,同時,Job 下分為多個 Step。Step 是一個獨立的、順序的處理步驟,包含該步驟批處理中需要的所有信息。多個批處理 Step 按照一定的流程組成一個 Job。通過這樣的設計方式,我們可以靈活配置 Job 的處理過程。
接下來,讓我們看一下如何配置繳費通知的 Job,如 清單 8 所示:
清單 8. message_job.xml
| <job id="messageJob"><step id="messageStep"><tasklet><chunk reader="messageReader" processor="messageProcessor" writer="messageWriter" commit-interval="5" chunk-completion-policy=""></chunk></tasklet></step> </job> |
如上,我們定義了一個名為“messageJob”的 Job,該 Job 僅包含一個 Step。在配置 Step 的過程中,我們不僅要指定讀數據、處理、寫數據相關的 bean,還要指定 commit-interval 和 chunk-completion-policy 屬性。前者指定了該 Step 中事務提交的粒度,取值為 5 即表明每當處理完畢讀入的 5 條數據時,提交一次事務。后者指定了 Step 的完成策略,即當什么情況發生時表明該 Step 已經完成,可以轉入后續處理。由于沒有明確指定相應的類,Spring Batch 使用默認策略,即當讀入數據為空時認為 Step 結束。
最后,我們還需要配置一個 JobRepository 并為其指定一個事務管理器,該類用于對 Job 進行管理,如 清單 9 所示:
清單 9. message_job.xml
| <beans:bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"><beans:property name="transactionManager" ref="transactionManager" /> </beans:bean><beans:bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/> |
因為我們整個示例不需要數據庫操作,所以選擇了使用 MapJobRepositoryFactoryBean 和 ResourcelessTransactionManager。
所有配置完成以后,進入最后一步——任務執行。
任務執行
那么如何運行一個 Job 呢? Spring Batch 提供了 JobLauncher 接口用于運行 Job,并提供了一個默認實現 SimpleJobLauncher。先讓我們看一下具體執行代碼,如 清單 10 所示:
清單 10. Main 類
| public class Main {public static void main(String[] args) {ClassPathXmlApplicationContext c = new ClassPathXmlApplicationContext("message_job.xml");SimpleJobLauncher launcher = new SimpleJobLauncher();launcher.setJobRepository((JobRepository) c.getBean("jobRepository"));launcher.setTaskExecutor(new SimpleAsyncTaskExecutor());try {launcher.run((Job) c.getBean("messageJob"), new JobParameters());} catch (Exception e) {e.printStackTrace();}} } |
首先,我們需要為 JobLauncher 指定一個 JobRepository,該類負責創建一個 JobExecution 對象來執行 Job,此處直接從上下文獲取即可。其次,需要指定一個任務執行器,我們使用 Spring Batch 提供的 SimpleAsyncTaskExecutor。最后,通過 run 方法來執行指定的 Job,該方法包含兩個參數,需要執行的 Job 以及執行參數。您可以通過運行示例工程查看運行結果。由于 MessageItemWriter 在每次輸出結果前,先打印了一行提示,因此您可以明顯看出輸出分 2 組進行打印,即事務被提交了 2 次(因為我們設置的事務粒度為 5。)。
從業務功能上考慮,同一任務應該盡量避免重復執行(即相同條件下的任務只能成功運行一次),試想如果本示例中發送繳費通知過多只能導致用戶不滿,那么電信計費批處理任務重復執行則將導致重復計費,從而使用戶遭受損失。幸運的是,Spring Batch 已經為我們考慮好了這些。
對于 Spring Batch 來說,JobParameters 相同的任務只能成功運行一次。您如果在示例 Main 類中連續運行同一 Job,將會得到如下異常(見 清單 11 ):
清單 11. 異常信息
| org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException: A job instance already exists and is complete for parameters={}. If you want to run this job again, change the parameters. |
因此,如果我們希望該任務是周期執行的(如每月執行一次),那么必須保證周期內參數是唯一。假如該客戶要求我們每月為用戶發送一次繳費通知。我們的任務執行可以如 清單 12 所示:
清單 12. Main 類
| Map<String,JobParameter> parameters = new HashMap<String,JobParameter>(); parameters.put(RUN_MONTH_KEY,new JobParameter("2011-10")); launcher.run((Job) c.getBean("messageJob"),new JobParameters(parameters)); parameters.put(RUN_MONTH_KEY,new JobParameter("2011-11")); launcher.run((Job) c.getBean("messageJob"),new JobParameters(parameters)); |
在示例中,我將執行月份作為 Job 的參數傳入,分別執行了 10、11 月兩個月的任務。
任務重試
既然相同參數的任務只能成功執行一次,那么,如果任務失敗該如何處理?此時,需要考慮的是,既然任務步驟有事務提交粒度,那么可能任務已經提交了部分處理結果,這部分不應該被重復處理。也就是說,此時應該有重試操作。
在 Spring Batch 中,通過配置可以實現步驟 Step 的重試,如 清單 13 所示:
清單 13. message_job.xml
| <job id="messageJob" restartable="true"><step id="messageStep"><tasklet><chunk reader="messageReader" processor="messageProcessor" writer="messageWriter"commit-interval="5" chunk-completion-policy="" retry-limit="2"><retryable-exception-classes><include class="java.lang.RuntimeException" /></retryable-exception-classes></chunk></tasklet></step> </job> |
我們可以看到,主要分兩部分:首先,需要設置重試次數,其次是當執行過程中捕獲到哪些異常時需要重試。如果在執行過程中捕獲到重試異常列表中的異常信息,則進行重試操作。如果重試操作達到最大次數仍提示異常,則認為任務執行失敗。對于異常信息的配置,除了通過 include 配置包含列表外,也可以通過 exclude 配置排除列表。
由于通過配置進行的 Step 重試是自動的,因此較難控制(多用于網絡訪問異常等不需要人工干預的情況)。可以考慮一下本示例,如果有一個用戶的信息有問題,名字為空,不能發送繳費通知,步驟重試便不合適了,此時我們可以對 Job 進行重試操作。
Spring Batch 允許重復執行未成功的 Job,而每次執行即為一次重試操作。示例代碼如 清單 14 所示:
清單 14. Main 類
| Map<String,JobParameter> parameters = new HashMap<String,JobParameter>(); parameters.put(RUN_MONTH_KEY,new JobParameter("2011-10")); launcher.run((Job) c.getBean("messageJob"),new JobParameters(parameters)); Thread.sleep(10000); launcher.run((Job) c.getBean("messageJob"),new JobParameters(parameters)); |
您可以通過如下步驟查看運行結果:首先,將 users.txt 文件中的第 7 行(之所以指定該行,便于驗證事務提交以及重復執行的起始位置)的用戶名修改為空。其次,運行示例。最后,在程序出現異常提示時,更新第 7 行的用戶名(為了便于演示,程序在兩次任務執行過程中等待 10 秒鐘)。
您可以在控制臺中很明顯的看到,任務先打印了 5 條記錄(第一次事務提交),然后出現異常信息,待我們將錯誤更正后,又打印了 5 條記錄,任務最終成功完成。
從輸出結果,我們可以知道 Spring Batch 是從出錯的事務邊界內第一條記錄重復執行的,這樣便確保了數據完整性,而且所有這一切對于用戶均是透明的。
那么 Spring Batch 是如何做到這一步的呢?這與 Spring Batch 的運行時管理是分不開的。
回頁首
運行時管理
Spring Batch 提供了如 表 1 所示的類用于記錄每個 Job 的運行信息:
表 1. 運行時類信息
| JobInstance | 該類記錄了 Job 的運行實例。舉例:10 月和 11 月分別執行同一 Job,將生成兩個 JobInstance。主要信息有:標識、版本、Job 名稱、Job 參數 |
| JobExecution | 該類記錄了 Job 的運行記錄。如上面的示例,Job 第一次運行失敗,第二次運行成功,那么將形成兩條運行記錄,但是對應的是同一個運行實例。主要信息有:Job 的運行時間、運行狀態等。 |
| JobParameters | 該類記錄了 Job 的運行參數 |
| ExecutionContext | 該類主要用于開發人員存儲任務運行過程中的相關信息(以鍵值對形式),主要分為 Job 和 Step 兩個范圍 |
| StepExecution | 該類與 JobExecution 類似,主要記錄了 Step 的運行記錄。包括此次運行讀取記錄條數、輸出記錄條數、提交次數、回滾次數、讀跳過條數、處理跳過條數、寫跳過條數等信息 |
Spring Batch 通過 JobRepository 接口維護所有 Job 的運行信息,此外 JobLauncher 的 run 方法也返回一個 JobExecution 對象,通過該對象可以方便的獲得 Job 其他的運行信息,代碼如 清單 15 所示:
清單 15. Main 類
| Map<String,JobParameter> parameters = new HashMap<String,JobParameter>(); parameters.put(RUN_MONTH_KEY,new JobParameter("2011-10")); JobExecution je = launcher.run((Job) c.getBean("messageJob"),new JobParameters(parameters)); System.out.println(je); System.out.println(je.getJobInstance()); System.out.println(je.getStepExecutions()); |
輸出信息如 清單 16 所示:
清單 16. 輸出結果
| JobExecution: id=0, version=2, startTime=Tue Nov 15 21:00:09 CST 2011, endTime=Tue Nov 15 21:00:09 CST 2011, lastUpdated=Tue Nov 15 21:00:09 CST 2011, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=0, version=0, JobParameters=[{run.month=2011-10}], Job=[messageJob]]JobInstance: id=0, version=0, JobParameters=[{run.month=2011-10}], Job=[messageJob] [StepExecution: id=1, version=5, name=messageStep, status=COMPLETED, exitStatus=COMPLETED, readCount=10, filterCount=0, writeCount=10 readSkipCount=0,writeSkipCount=0, processSkipCount=0, commitCount=3 , rollbackCount=0, exitDescription=] |
從日志您可以發現事務一共提交了 3 次,這與前面的說明是不一致的。之所以會如此是因為當事務提交粒度恰好可以被記錄數整除時,事務會有一次空提交。
關于 Spring Batch 運行時信息管理,將在講解 Job 監控時再詳細介紹,此處不再贅述,你也可以查看 Spring Batch 參考資料了解相關信息。
回頁首
總結
本文通過一個簡單示例演示了如何構建 Spring Batch 應用,同時介紹了 Spring Batch 的相關核心概念。希望您通過本文可以掌握 Spring Batch 的基本功能。在接下來的文章中,我將繼續介紹 Spring Batch 的兩個重要特性:Job 流和并發。
回頁首
下載
| 示例代碼 | batch_sample.zip | 34KB | HTTP |
關于下載方法的信息
?
參考資料
學習
- Spring Batch 主頁,可以初步了解 Spring Batch 的基本架構。
- Spring Batch 發布包,您可以在這里找到各個版本的 Spring Batch 發布包。
- Spring Batch 入門,教你如何入門。
- Spring Batch 參考手冊,詳細了解 Spring Batch 框架。
- Spring 參考手冊,Spring Framework 知識學習。
- “Spring Richclient 中的安全認證管理”(developerWorks,2011 年 7 月):作為企業級開發框架,Spring Richclient 為我們提供了完善的安全認證管理功能,使我們能夠方便構建安全的企業級應用。本文將詳細介紹 Spring Richclient 中安全認證管理的實現方式以及使用方法。
- “Struts2、Spring、Hibernate 高效開發的最佳實踐”(developerWorks,2011 年 8 月):Struts2、Spring、Hibernate(SSH)是最常用的 Java EE Web 組件層的開發技術搭配,網絡中和許多 IT 技術書籍中都有它們的開發教程,但是通常的教程都會讓很多程序員陷入痛苦的配置與修改配置的過程。本文利用 SSH 中的技術特性,利用 Java 反射技術,按照規約優于配置的原理,基于 SSH 設定編寫了一個通用開發框架,這使得開發者可以專注于業務邏輯的開發。
- “如何將基于 Struts、Spring 和 Hibernate 的應用從 Tomcat 遷移到 WebSphere Application Server”(developerWorks,2011 年 11 月):本文向讀者介紹基于 Eclipse 開發的 Struts、Spring 和 Hibernate 開源應用和開發環境的特點,并通過實例介紹從 Tomcat 遷移到 WebSphere 所遇到的問題及其解決方案。
- “基于 Spring 和 iBATIS 的動態可更新多數據源持久層”(developerWorks,2012 年 2 月):開發擁有多重數據源的項目時,經常希望能夠通過用戶界面來動態配置數據源。本文針對這一問題提出了創新的解決方案,通過使用 Spring+iBATIS 的組合,來實現可動態更新的多重數據源的持久層,從而可以通過用戶界面自主地管理所需的數據源。
- developerWorks Java 技術專區:這里有數百篇關于 Java 編程各個方面的文章。
討論
- 加入 developerWorks 中文社區。查看開發人員推動的博客、論壇、組和維基,并與其他 developerWorks 用戶交流。
?
總結
以上是生活随笔為你收集整理的Spring batch Job define的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: spring batch
- 下一篇: Ab Initio软件