javascript
全网最详细SpringBatch批处理读取分区(Paratition)文件讲解
文章目錄
- 一、分區(qū)Step
- 1、數(shù)據(jù)分區(qū)
- 2、分區(qū)處理
- 二、實現(xiàn)分區(qū)關(guān)鍵接口
- 1、Partitioner
- 2、StepExecutionSplitter
- 3、PartitionHandler
- 三、基本配置和屬性說明
- 1、基本配置
- 2、屬性說明
- 四、文件分區(qū)
- 1、定義分區(qū)文件Partitioner
- 2、定義文件讀
- 3、定義分區(qū)job配置
- 4、定義processor
- 4、定義writer
- 4、定義step監(jiān)聽器
- 6、運行job
寫在前面: 我是「境里婆娑」。我還是從前那個少年,沒有一絲絲改變,時間只不過是考驗,種在心中信念絲毫未減,眼前這個少年,還是最初那張臉,面前再多艱險不退卻。
寫博客的目的就是分享給大家一起學(xué)習(xí)交流,如果您對 Java感興趣,可以關(guān)注我,我們一起學(xué)習(xí)。
前言:為什么要寫這篇文章,在網(wǎng)上很難找到一篇關(guān)于SpringBatch批處理讀取分區(qū)文件基于JavaBean配置的文章,因此我決定寫一篇關(guān)于SpringBatch讀取分區(qū)文件基于javaBean配置的文章,希望這篇文章可以幫助新手的你或者你有一定經(jīng)驗的可以加深印象。
一、分區(qū)Step
何為分區(qū)Step:
通過將任務(wù)進(jìn)行分區(qū),不同的Step處理不同任務(wù)數(shù)據(jù)達(dá)到提高Job效率功能。
分區(qū)作業(yè)可以分區(qū)兩個處理階段,數(shù)據(jù)分區(qū)、分區(qū)處理;
1、數(shù)據(jù)分區(qū)
數(shù)據(jù)分區(qū):根據(jù)特殊的規(guī)則,將數(shù)據(jù)進(jìn)行合理分片,為不同的數(shù)據(jù)切片生成數(shù)據(jù)執(zhí)行上下文Execution Context、作業(yè)執(zhí)行器Step Execution。可以通過接口Partitioner生成自定義分區(qū)邏輯,SpringBatch批處理框架默認(rèn)對多文件實現(xiàn)MultiResourcePartititoner;也可以自行擴(kuò)展接口Partitioner實現(xiàn)自定義分區(qū)邏輯。
2、分區(qū)處理
分區(qū)處理:通過數(shù)據(jù)分區(qū)后,不同的數(shù)據(jù)已經(jīng)被分配到不同的作業(yè)執(zhí)行器中,接下來需要交給分區(qū)處理器進(jìn)行作業(yè),分區(qū)處理器可以在本地或遠(yuǎn)程執(zhí)行被劃分的作業(yè)。接口PartitionHandler定義了分區(qū)處理邏輯,SpringBatch批處理框架默認(rèn)實現(xiàn)了本地分區(qū)處理TaskExecutorPartitionHandler;也可以自行擴(kuò)展接口PartitionHandler來實現(xiàn)自定義分區(qū)邏輯。
分區(qū)作業(yè)邏輯結(jié)構(gòu)圖:
二、實現(xiàn)分區(qū)關(guān)鍵接口
實現(xiàn)分區(qū)關(guān)鍵接口有如下:PartitionHandler、StepExecutionSplitter、Partitioner。
1、Partitioner
Partitoner接口定義了如何根據(jù)給定的分區(qū)規(guī)則進(jìn)行創(chuàng)建作業(yè)執(zhí)行分區(qū)的上下文。
Partitioner接口定義如下:
public interface Partitioner {Map<String, ExecutionContext> partition(int gridSize); }gridSize含義:根據(jù)給定的gridSize大小進(jìn)行執(zhí)行上下文劃分。
2、StepExecutionSplitter
StepExecutionSplitter接口定義了如何根據(jù)給定的分區(qū)規(guī)則進(jìn)行創(chuàng)建作業(yè)執(zhí)行分區(qū)的執(zhí)行器。
StepExecutionSplitter接口定義如下
public interface StepExecutionSplitter {String getStepName();Set<StepExecution> split(StepExecution stepExecution, int gridSize) throws JobExecutionException; }getStepName:獲取當(dāng)前定義的分區(qū)作業(yè)的名稱。
split:根據(jù)給定的分區(qū)規(guī)則為每個分區(qū)生成對應(yīng)的分區(qū)執(zhí)行器。
3、PartitionHandler
PartitionHandler接口定義了分區(qū)處理的邏輯,根據(jù)給定的StepExecutionSplitter進(jìn)行分區(qū)并執(zhí)行,最后將執(zhí)行的結(jié)果進(jìn)行收集,反饋給前端。
PartitionHandler接口定義如下
public interface PartitionHandler {Collection<StepExecution> handle(StepExecutionSplitter stepSplitter, StepExecution stepExecution) throws Exception; }三、基本配置和屬性說明
上面兩節(jié)基本知識已經(jīng)介紹完畢,下面我們將講一個例子來鞏固之前知識。
1、基本配置
一個典型分區(qū)Job配置
@Beanpublic Step partitionMasterMultiFileStep() {return stepBuilderFactory.get("partitionMasterMultiFileStep").partitioner(partitionSlaveMultiFileStep().getName(),multiResourcePartitioner()).partitionHandler(multiFilePartitionHandler()).build();}2、屬性說明
在配置分區(qū)Step之前,我們先看下分區(qū)Step的主要屬性定義和元素定義
| step | 用于指定分區(qū)step名稱 |
| handler(屬性) | 屬性handler指定分區(qū)執(zhí)行器,需要實現(xiàn)接口PartitionHandler |
| handler(子元素) | 用于定義默認(rèn)實現(xiàn):TaskExecutorPartitionHandler |
| task-executor | 生命使用的線程池 |
| grid-size | 聲明分區(qū)的HashMap的初始值大小 |
四、文件分區(qū)
SpringBatch框架提供了對文件分區(qū)的支持,實現(xiàn)類:MultiResourcePartitioner提供了對文件分區(qū)的默認(rèn)支持,根據(jù)文件名將不同文件處理進(jìn)行分區(qū),提升處理速度和效率。本文將按照此例子給出如何配置多文件分區(qū)實現(xiàn)。
讀取文件如下:
本節(jié)實例由于文件多,我們對文件進(jìn)行分區(qū),然后將文件的內(nèi)容寫入DB,邏輯示意圖如下:
1、定義分區(qū)文件Partitioner
定義文件分區(qū),將不同的文件分配到不同的作業(yè)中,使用自定義MyMultiResourcePartitioner分區(qū)。
自定義分區(qū)MyMultiResourcePartitioner如下:
/*** @author shuliangzhao* @date 2020/12/4 23:14*/ public class MyMultiResourcePartitioner implements Partitioner {private static final String DEFAULT_KEY_NAME = "fileName";private static final String PARTITION_KEY = "partition";private Resource[] resources = new Resource[0];private String keyName = DEFAULT_KEY_NAME;public void setResources(Resource[] resources) {this.resources = resources;}public void setKeyName(String keyName) {this.keyName = keyName;}@Overridepublic Map<String, ExecutionContext> partition(int gridSize) {Map<String, ExecutionContext> map = new HashMap<String, ExecutionContext>(gridSize);int i = 0;for (Resource resource : resources) {ExecutionContext context = new ExecutionContext();Assert.state(resource.exists(), "Resource does not exist: "+resource);try {context.putString(keyName, resource.getURI().getPath());}catch (IOException e) {throw new IllegalArgumentException("File could not be located for: "+resource, e);}map.put(PARTITION_KEY + i, context);i++;}return map;} }屬性keyName:用于指定作業(yè)上文中屬性名字,作用是在不同的作業(yè)上下文中可以獲取設(shè)置的對于屬性值。可以在讀寫階段通過@Value("#{stepExecutionContext[fileName]}"方式獲取。
2、定義文件讀
配置好分區(qū)實現(xiàn),需要在每個分區(qū)作業(yè)中讀入不同文件,進(jìn)而提供文件處理效率。
PartitionMultiFileReader 實現(xiàn)
public class PartitionMultiFileReader extends FlatFileItemReader {public PartitionMultiFileReader(Class clz,String fileName) {setResource(new FileSystemResource(fileName.substring(1)));Field[] declaredFields = clz.getDeclaredFields();List<String> list = new ArrayList<>();for (Field field:declaredFields) {list.add(field.getName());}String[] names = new String[list.size()];DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer();delimitedLineTokenizer.setDelimiter(",");delimitedLineTokenizer.setNames(list.toArray(names));DefaultLineMapper defaultLineMapper = new DefaultLineMapper();defaultLineMapper.setLineTokenizer(delimitedLineTokenizer);CommonFieldSetMapper commonFieldSetMapper = new CommonFieldSetMapper();commonFieldSetMapper.setTargetType(clz);defaultLineMapper.setFieldSetMapper(commonFieldSetMapper);setLineMapper(defaultLineMapper);setName(clz.getSimpleName());} }3、定義分區(qū)job配置
基于javabean方式實現(xiàn)job配置
package com.sl.config; //包導(dǎo)入省略/*** @author shuliangzhao* @Title: PartitionFileConfiguration* @ProjectName spring-boot-learn* @Description: TODO* @date 2020/12/4 21:09*/ @Configuration @EnableBatchProcessing public class PartitionMultiFileConfiguration {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowiredprivate PartitonMultiFileProcessor partitonMultiFileProcessor;@Autowiredprivate PartitionMultiFileWriter partitionMultiFileWriter;@Beanpublic Job partitionMultiFileJob() {return jobBuilderFactory.get("partitionMultiFileJob").start(partitionMasterMultiFileStep()).build();}@Beanpublic Step partitionMasterMultiFileStep() {return stepBuilderFactory.get("partitionMasterMultiFileStep").partitioner(partitionSlaveMultiFileStep().getName(),multiResourcePartitioner()).partitionHandler(multiFilePartitionHandler()).build();}@Beanpublic PartitionHandler multiFilePartitionHandler() {TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();handler.setGridSize(2);handler.setStep(partitionSlaveMultiFileStep());handler.setTaskExecutor(new SimpleAsyncTaskExecutor());return handler;}@Beanpublic Step partitionSlaveMultiFileStep() {return stepBuilderFactory.get("partitionSlaveMultiFileStep").<CreditBill,CreditBill>chunk(1).reader(partitionMultiFileReader(null)).processor(partitonMultiFileProcessor).writer(partitionMultiFileWriter).build();}@Bean@StepScopepublic PartitionMultiFileReader partitionMultiFileReader(@Value("#{stepExecutionContext[fileName]}")String fileName) {return new PartitionMultiFileReader(CreditBill.class,fileName);}@Beanpublic MyMultiResourcePartitioner multiResourcePartitioner() {MyMultiResourcePartitioner multiResourcePartitioner = new MyMultiResourcePartitioner();multiResourcePartitioner.setKeyName("fileName");multiResourcePartitioner.setResources(getResource());return multiResourcePartitioner;}private Resource[] getResource() {String filePath = "D:\\aplus\\bill\\";File file = new File(filePath);List<Resource> resourceList = new ArrayList<>();if (file.isDirectory()) {String[] list = file.list();if (list != null) {for (String str : list) {String resource = file.getPath() + "\\" + str;FileSystemResource fileSystemResource = new FileSystemResource(resource);resourceList.add(fileSystemResource);}}}Resource[] resources = new Resource[resourceList.size()];return resourceList.toArray(resources);}}4、定義processor
定義processor
/*** @author shuliangzhao* @date 2020/12/4 22:11*/ @Component @StepScope public class PartitonMultiFileProcessor implements ItemProcessor<CreditBill,CreditBill> {@Overridepublic CreditBill process(CreditBill item) throws Exception {CreditBill creditBill = new CreditBill();creditBill.setAcctid(item.getAcctid());creditBill.setAddress(item.getAddress());creditBill.setAmout(item.getAmout());creditBill.setDate(item.getDate());creditBill.setName(item.getName());return creditBill;} }4、定義writer
/*** @author shuliangzhao* @date 2020/12/4 22:29*/ @Component @StepScope public class PartitionMultiFileWriter implements ItemWriter<CreditBill> {@Autowiredprivate CreditBillMapper creditBillMapper;@Overridepublic void write(List<? extends CreditBill> items) throws Exception {if (items != null && items.size() > 0) {items.stream().forEach(item -> {creditBillMapper.insert(item);});}} }4、定義step監(jiān)聽器
定義step監(jiān)聽器目的是在處理作業(yè)之前打印線程名字和讀取文件名字
@Component public class PartitionStepListener implements StepExecutionListener {private static final Logger logger = LoggerFactory.getLogger(PartitionStepListener.class);@Overridepublic void beforeStep(StepExecution stepExecution) {logger.info("ThreadName={},steName={},FileName={}",Thread.currentThread().getName(),stepExecution.getStepName(),stepExecution.getExecutionContext().getString("fileName"));}@Overridepublic ExitStatus afterStep(StepExecution stepExecution) {return null;} }6、運行job
執(zhí)行job查看結(jié)果,可以看出不同的文件有不同的線程來處理,并且被分配到不同的分區(qū)作業(yè)步中執(zhí)行
2020-12-05 15:58:34.100 INFO 13208 --- [cTaskExecutor-1] com.sl.listener.PartitionStepListener : ThreadName=SimpleAsyncTaskExecutor-1,steName=partitionSlaveMultiFileStep:partition1,FileName=/D:/aplus/bill/bill2.csv 2020-12-05 15:58:34.114 INFO 13208 --- [cTaskExecutor-3] com.sl.listener.PartitionStepListener : ThreadName=SimpleAsyncTaskExecutor-3,steName=partitionSlaveMultiFileStep:partition0,FileName=/D:/aplus/bill/bill1.csv 2020-12-05 15:58:34.122 INFO 13208 --- [cTaskExecutor-2] com.sl.listener.PartitionStepListener : ThreadName=SimpleAsyncTaskExecutor-2,steName=partitionSlaveMultiFileStep:partition2,FileName=/D:/aplus/bill/bill3.csv至此,我們完成了對文件分區(qū)的處理。
如果想更詳細(xì)查看以上所有代碼請移步到github:文件分區(qū)詳細(xì)代碼
總結(jié)
以上是生活随笔為你收集整理的全网最详细SpringBatch批处理读取分区(Paratition)文件讲解的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 一篇文章教你弄懂 SpringMvc中的
- 下一篇: 全网最详细SpringBatch读(Re