javascript
一篇文章教你学会使用SpringBatch 监听器Listener
文章目錄
- 一、SpringBatch監(jiān)聽器
- 二、搭建SpringBatch開發(fā)環(huán)境
- 三、監(jiān)聽器詳細介紹
- 1、JobExecutionListener
- 2、StepExecutionListener
- 3、ChunkListener
- 4、ItemReaderListener
- 5、ItemProcessListener
- 6、ItemWriteListener
- 7、SkipListener
- 四、創(chuàng)建job執(zhí)行監(jiān)聽器
寫在前面: 我是「境里婆娑」。我還是從前那個少年,沒有一絲絲改變,時間只不過是考驗,種在心中信念絲毫未減,眼前這個少年,還是最初那張臉,面前再多艱險不退卻。
寫博客的目的就是分享給大家一起學習交流,如果您對 Java感興趣,可以關(guān)注我,我們一起學習。
前言:為什么要寫這篇文章,因為我們在用SpringBatch做批量時候,經(jīng)常會在job執(zhí)行之前之后,或者step之前之后做一些特殊處理,或者程序報錯我們要把報錯的數(shù)據(jù)抓出來。這時候SpringBatch提供的監(jiān)聽器可以很好的幫助我們解決這些問題。下面就讓我們一步一步去研究SpringBatch監(jiān)聽器吧。
一、SpringBatch監(jiān)聽器
Spring Batch提供了多種監(jiān)聽器Listener,用于在任務處理過程中觸發(fā)我們的邏輯代碼。常用的監(jiān)聽器根據(jù)粒度從粗到細分別有:Job級別的監(jiān)聽器JobExecutionListener、Step級別的監(jiān)聽器StepExecutionListener、Chunk監(jiān)聽器ChunkListener、ItemReader監(jiān)聽器ItemReadListener、ItemWriter監(jiān)聽器ItemWriteListener和ItemProcessor監(jiān)聽器ItemProcessListener和SkipListener等。具體可以參考下表:
| JobExecutionListener | 在Job開始之前(beforeJob)和之后(aflerJob)觸發(fā) |
| StepExecutionListener | 在Step開始之前(beforeStep)和之后(afterStep)觸發(fā) |
| ChunkListener | 在 Chunk 開始之前(beforeChunk),之后(afterChunk)和錯誤后(afterChunkError)觸發(fā) |
| ItemReadListener | 在 Read 開始之前(beforeRead>,之后(afterRead)和錯誤后(onReadError)觸發(fā) |
| ItemProcessListener | 在 Processor 開始之前(beforeProcess),之后(afterProcess)和錯誤后(onProcessError)觸發(fā) |
| ItemWriteListener | 在 Writer 開始之前(beforeWrite),之后(afterWrite)和錯誤后(onWriteError)觸發(fā) |
| SkipListener | 在 Skip(reder)時候,在 Skip(writer)時候,在 Skip(processor)時候 |
二、搭建SpringBatch開發(fā)環(huán)境
新建一個Spring Boot項目,版本為2.3.1.RELEASE,引入SpringBatch jar包,項目結(jié)構(gòu)如下圖所示:
如果對怎么快速搭建SpringBoot功能有疑問,請看此篇文章:
一篇文章教你學會使用SpringBoot實現(xiàn)文件上傳和下載
三、監(jiān)聽器詳細介紹
每種監(jiān)聽器都可以通過兩種方式使用:
- 接口實現(xiàn);
- 注解驅(qū)動。
1、JobExecutionListener
Job監(jiān)聽器提供了兩個方法,一個是beforeJob(),另一個是afterJob()。這個監(jiān)聽器我們可以做些參數(shù)的加載。比如在job執(zhí)行之前把參數(shù)加載進去。在執(zhí)行之后把參數(shù)清除等操作。
@Component public class MyJobExecutionListener implements JobExecutionListener {@Overridepublic void beforeJob(JobExecution jobExecution) {System.out.println("job執(zhí)行之前做處理...");}@Overridepublic void afterJob(JobExecution jobExecution) {System.out.println("job執(zhí)行之后做處理...");} }2、StepExecutionListener
step監(jiān)聽器和job監(jiān)聽器功能差不多,也有兩個方法beforeStep()和afterStep()。
afterStep()這個方法有返回值,可以設(shè)置step的執(zhí)行狀態(tài)。
3、ChunkListener
ChunkListener有三個方法beforeChunk()、afterChunk()、afterChunkError()。都能把chunk的上下文打印出來。通過beforeChunk和afterChunk可以把一個chunk的執(zhí)行時間算出來。
@Component public class MyChunkListener implements ChunkListener {@Overridepublic void beforeChunk(ChunkContext chunkContext) {System.out.println("chunk執(zhí)行之前:" +System.currentTimeMillis());}@Overridepublic void afterChunk(ChunkContext chunkContext) {System.out.println("chunk執(zhí)行之后:" +System.currentTimeMillis());}@Overridepublic void afterChunkError(ChunkContext chunkContext) {System.out.println("chunk執(zhí)行出錯...");} }4、ItemReaderListener
reder監(jiān)聽器也有三個方法beforeRead、afterRead、onReadError
如果我們讀到一條數(shù)據(jù),需要處理,可以在afterReader之后做操作。
5、ItemProcessListener
Process監(jiān)聽器也有三個方法,其實我們經(jīng)常用的是onProcessError這個方法,這個方法可以在處理process時候把異常數(shù)據(jù)打印出來。可以快速定位問題。
@Component public class MyItemProcessListener implements ItemProcessListener {@Overridepublic void beforeProcess(Object o) {System.out.println("Process之前...");}@Overridepublic void afterProcess(Object o, Object o2) {System.out.println("Process之h后...");}@Overridepublic void onProcessError(Object o, Exception e) {System.out.println("Process報錯...");} }6、ItemWriteListener
特別注意的是onWriteError這個方法,是吧這個list所有對象全部打印出來。
@Component public class MyItemWriterListener implements ItemWriteListener {@Overridepublic void beforeWrite(List list) {System.out.println("writer之前...");}@Overridepublic void afterWrite(List list) {System.out.println("writer之后...");}@Overridepublic void onWriteError(Exception e, List list) {System.out.println("writer異常...");} }7、SkipListener
skip監(jiān)聽器可以處理RPW報異常的時候把這些數(shù)據(jù)跳過。
@Component public class MySkipListener implements SkipListener{@Overridepublic void onSkipInRead(Throwable throwable) {System.out.println("skip reader...");}@Overridepublic void onSkipInWrite(Object o, Throwable throwable) {System.out.println("skip write...");}@Overridepublic void onSkipInProcess(Object o, Throwable throwable) {System.out.println("skip process...");} }四、創(chuàng)建job執(zhí)行監(jiān)聽器
準備好這些監(jiān)聽器后,新建job包,然后在該包下新建ListenJobConfiguration:
/*** @author shuliangzhao* @Title: ListenJobConfiguration* @date 2020/7/12 14:55*/ @Configuration @EnableBatchProcessing public class ListenJobConfiguration {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowiredprivate MyJobExecutionListener myJobExecutionListener;@Autowiredprivate MyStepExecutionListener myStepExecutionListener;@Autowiredprivate MyChunkListener myChunkListener;@Autowiredprivate MyItemReadListener myItemReadListener;@Autowiredprivate MyItemProcessListener myItemProcessListener;@Autowiredprivate MyItemWriterListener myItemWriterListener;@Autowiredprivate MySkipListener mySkipListener;@Beanpublic Job listenerJob() {return jobBuilderFactory.get("listenerJob").listener(myJobExecutionListener).start(listenerStep()).build();}@Beanpublic Step listenerStep() {return stepBuilderFactory.get("listenerStep").listener(myStepExecutionListener).listener(myChunkListener).chunk(10).reader(reader()).listener(myItemReadListener).processor(processor()).listener(myItemProcessListener).writer(list -> list.forEach(System.out::println)).listener(myItemWriterListener).listener(mySkipListener).build();}@Bean@StepScopepublic ItemReader<String> reader() {List<String> data = Arrays.asList("java", "c++", "javascript", "python");return new MyItemReader(data);}@Bean@StepScopepublic ItemProcessor<String, String> processor() {return item -> item + " language";} } public class MyItemReader implements ItemReader<String> {private Iterator<String> iterator;public MyItemReader(List<String> data) {this.iterator = data.iterator();}@Overridepublic String read() {return iterator.hasNext() ? iterator.next() : null;} }上面代碼我們在相應的位置配置了監(jiān)聽器(配置chunk監(jiān)聽器的時候,必須配置faultTolerant())。
啟動項目,控制臺日志打印如下:
到此SpringBatch常用監(jiān)聽器已經(jīng)全部介紹完畢。如果還有不明白的可以留言。
本文來源代碼: SpringBatch監(jiān)聽器源碼
—————————————————————————————————
由于本人水平有限,難免有不足,懇請各位大佬不吝賜教!
總結(jié)
以上是生活随笔為你收集整理的一篇文章教你学会使用SpringBatch 监听器Listener的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 一篇文章教你学会使用SpringBoot
- 下一篇: 一篇文章教你学会Java泛型