javascript
扩展Spring Batch –步骤分区
在之前的幾篇文章中,我們已經(jīng)討論了如何啟動(dòng)和運(yùn)行Spring Batch。 現(xiàn)在,我們將開(kāi)始討論可用于擴(kuò)展Spring Batch的一些策略。
本文將重點(diǎn)介紹如何對(duì)步驟進(jìn)行分區(qū),以使該步驟具有多個(gè)線(xiàn)程,每個(gè)線(xiàn)程并行處理一塊數(shù)據(jù)。 如果您有大量的數(shù)據(jù)可以在邏輯上拆分為可以并行處理的較小的塊,這將非常有幫助。 這種工作方式是,您將定義一個(gè)主要步驟,該步驟負(fù)責(zé)確定塊的基礎(chǔ),然后將所有這些塊都植入到一組從屬步驟中,以處理每個(gè)塊。
分區(qū)
如果我能回到過(guò)去的經(jīng)驗(yàn),那么一個(gè)很好的例子就是在大型采購(gòu)系統(tǒng)中處理每個(gè)公司的所有每日發(fā)票。 我們將要處理的數(shù)據(jù)可以按處理的每個(gè)公司在邏輯上進(jìn)行拆分。 假設(shè)有250家公司參與此采購(gòu)系統(tǒng),我們的分區(qū)步驟已定義為具有15個(gè)線(xiàn)程。 我們的分區(qū)程序可能會(huì)運(yùn)行查詢(xún),以查找當(dāng)天有等待處理發(fā)票的所有公司。 此時(shí),分區(qū)程序的職責(zé)是為每個(gè)公司創(chuàng)建一個(gè)ExecutionContext并將其添加到具有唯一鍵的地圖中。 該ExecutionContext應(yīng)該包含處理該公司的發(fā)票所需的任何信息,例如公司ID和任何其他相關(guān)信息。 當(dāng)分區(qū)程序返回ExecutionContexts的映射時(shí),Spring Batch將為映射中的每個(gè)條目創(chuàng)建一個(gè)新的Step ,并將鍵值用作步驟名稱(chēng)的一部分。 根據(jù)配置(例如15個(gè)線(xiàn)程),它將創(chuàng)建15個(gè)線(xiàn)程的池并開(kāi)始一次并行執(zhí)行15個(gè)步驟。 例如,如果您有85個(gè)步驟,Spring Batch將開(kāi)始執(zhí)行15個(gè)步驟,并且在完成每個(gè)步驟后,完成該步驟的線(xiàn)程將接下一個(gè)步驟并開(kāi)始執(zhí)行,直到所有步驟都已完成。
一個(gè)例子
現(xiàn)在,我們對(duì)分區(qū)的工作原理有了基本的了解,讓我們看一個(gè)簡(jiǎn)單的示例。 對(duì)于我們的用例,我們將檢查入站目錄,傳入的供應(yīng)商目錄文件將被轉(zhuǎn)儲(chǔ)到該目錄中。 因此,要?jiǎng)?chuàng)建一個(gè)Spring Batch分區(qū)程序,我們需要?jiǎng)?chuàng)建一個(gè)實(shí)現(xiàn)Spring Batch的Partitioner接口的類(lèi)。 由于這是通用的并且可以重用,所以我們將調(diào)用此類(lèi)MultiFileResourcePartitioner ,這是一個(gè)簡(jiǎn)單的POJO,并且只有一個(gè)字段名稱(chēng)“ inboundDir”代表包含要處理文件的目錄的路徑。 Partitioner接口指定該類(lèi)應(yīng)實(shí)現(xiàn)一個(gè)名為“ partition”的方法,該方法采用一個(gè)表示網(wǎng)格大小的int參數(shù),并返回一個(gè)保存ExecutionContext的Map。
這是MultiFileResourcePartitioner的類(lèi)列表:
package com.keyhole.example.partition;import java.io.File; import java.util.HashMap; import java.util.Map;import org.springframework.batch.core.partition.support.Partitioner; import org.springframework.batch.item.ExecutionContext; import org.springframework.core.io.FileSystemResource;public class MultiFileResourcePartitioner implements Partitioner {private String inboundDir;@Overridepublic Map<String, ExecutionContext> partition(int gridSize) {Map<String, ExecutionContext> partitionMap = new HashMap<String, ExecutionContext>();File dir = new File(inboundDir);if (dir.isDirectory()) {File[] files = dir.listFiles();for (File file : files) {ExecutionContext context = new ExecutionContext();context.put("fileResource", file.toURI().toString());partitionMap.put(file.getName(), context);}}return partitionMap;}public String getInboundDir() {return inboundDir;}public void setInboundDir(String inboundDir) {this.inboundDir = inboundDir;}}在我們的應(yīng)用程序上下文中,此bean的配置如下所示:
<bean id="inventoryFilePartitioner"class="com.keyhole.example.partition.MultiFileResourcePartitioner"scope="step"><property name="inboundDir" value="/data/inbound" /> </bean>查看實(shí)現(xiàn)的分區(qū)方法,我們只列出所有在指定入站目錄中找到的文件,并為找到的每個(gè)文件創(chuàng)建一個(gè)ExecutionContext并將其添加到返回的地圖中。 用于將每個(gè)ExecutionContext放入映射中的唯一鍵也將成為為映射中的每個(gè)條目創(chuàng)建的步驟名稱(chēng)的一部分。 Spring Batch將使用該分區(qū)映射根據(jù)映射中找到的每個(gè)鍵創(chuàng)建一個(gè)從屬步驟。
要對(duì)步驟進(jìn)行分區(qū),您需要首先創(chuàng)建分區(qū)配置將引用的步驟。 該步驟應(yīng)與Spring Batch中的任何其他步驟一樣進(jìn)行配置,在此示例中,我們將定義FlatFileItemReader和簡(jiǎn)單的ItemWriter ,它們將僅調(diào)用toString()方法并將其記錄到控制臺(tái)。
這是該踏板及其相關(guān)彈簧彈片的配置詳細(xì)信息。 這里要注意的重要一點(diǎn)是,我們將ItemReader的作用域限定在步驟級(jí)別,這樣我們就不會(huì)在使用同一bean處理數(shù)據(jù)的多個(gè)線(xiàn)程中遇到任何問(wèn)題。 我們還需要以這種方式確定它們的作用域,以便我們可以使用Spring后期綁定在Step的ExecutionContext中指定保存文件資源的值。
<batch:step id="inventoryLoadStep"xmlns="http://www.springframework.org/schema/batch"><batch:tasklet transaction-manager="transactionManager"><batch:chunk reader="inventoryLoadReader" writer="logItemWriter"commit-interval="5000" /></batch:tasklet> </batch:step> <bean name="inventoryLoadReader" scope="step"class="org.springframework.batch.item.file.FlatFileItemReader"><property name="resource"value="#{stepExecutionContext['fileResource']}" /><property name="lineMapper" ref="inventoryLineMapper" /> <property name="linesToSkip" value="1" /> </bean><bean name="inventoryLineMapper"class="org.springframework.batch.item.file.mapping.DefaultLineMapper"><property name="fieldSetMapper" ref="inventoryFieldMapper" /><property name="lineTokenizer" ref="inventoryLineTokenizer" /> </bean><bean name="inventoryLineTokenizer" class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer" />由于在此示例中我們正在讀取和處理以逗號(hào)分隔的文本文件,因此對(duì)于此步驟配置,我們只需編寫(xiě)很少的代碼。 我們唯一需要實(shí)現(xiàn)的代碼是將行的內(nèi)容映射到表示文件記錄的對(duì)象所需的FieldSetMapper 。 文件中的每一行都將包含“類(lèi)別”,“子類(lèi)別”,“描述”,“目錄編號(hào)”,“顏色”,“尺寸”,“價(jià)格”和“數(shù)量”字段。 因此,我們的對(duì)象將包含這些字段,并且我們的FieldSetMapper代碼清單將如下所示。
package com.keyhole.example.partition;import org.springframework.batch.item.file.mapping.FieldSetMapper; import org.springframework.batch.item.file.transform.FieldSet; import org.springframework.stereotype.Component; import org.springframework.validation.BindException;@Component("inventoryFieldMapper") public class InventoryItemFieldSetMapper implements FieldSetMapper<InventoryItem> {@Overridepublic InventoryItem mapFieldSet(FieldSet fieldSet) throws BindException {InventoryItem item = new InventoryItem();item.setCategory(fieldSet.readString(0));item.setSubCategory(fieldSet.readString(1));item.setDescription(fieldSet.readString(2));item.setCatalogNum(fieldSet.readString(3));item.setColor(fieldSet.readString(4));item.setSize(fieldSet.readString(5));item.setPrice(fieldSet.readDouble(6));item.setQty(fieldSet.readInt(7));return item;}}現(xiàn)在我們已經(jīng)創(chuàng)建并配置了Partitioner和Step,剩下要做的就是配置分區(qū)步驟本身! 就像這樣簡(jiǎn)單:
<batch:job id="InventoryLoader"><batch:step id="partitionedInventoryLoadStep"><batch:partition step="inventoryLoadStep" partitioner="inventoryFilePartitioner"><batch:handler grid-size="10" task-executor="inventoryLoadTaskExecutor" /></batch:partition></batch:step> </batch:job>在配置分區(qū)步驟時(shí),您可以像定義其他步驟一樣定義一個(gè)步驟,方法是為其指定一個(gè)ID,并根據(jù)需要指定下一步驟的值。 Spring Batch沒(méi)有將步驟的內(nèi)容定義為普通的塊或任務(wù)集,而是提供了一個(gè)分區(qū)標(biāo)簽,該標(biāo)簽要求您指定要分區(qū)的作業(yè)步驟以及將用于確定數(shù)據(jù)塊的Partitioner。 您還需要定義將要處理這些步驟的分區(qū)處理程序,在這種情況下,我們將使用ThreadPoolTask??Executor ,該線(xiàn)程處理程序的線(xiàn)程池大小為10,如果不使用它們,則允許它們超時(shí)。
<bean id="inventoryLoadTaskExecutor"class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"><property name="corePoolSize" value="10" /><property name="maxPoolSize" value="10" /><property name="allowCoreThreadTimeOut" value="true" /> </bean>因此,如果您的Spring Batch流程具有處理大量記錄的步驟,并且您對(duì)提高性能感興趣,請(qǐng)考慮嘗試進(jìn)行步驟分區(qū)。 它應(yīng)該易于實(shí)現(xiàn),并為您提供一些其他性能,以幫助加快處理時(shí)間。
其他資源
對(duì)于與本文相關(guān)的示例代碼,我已將源代碼上傳到位于https://github.com/jonny-hackett/batch-example的github存儲(chǔ)庫(kù)中。 要執(zhí)行與本文相關(guān)的示例代碼,有一個(gè)JUnit測(cè)試名稱(chēng)InventoryLoadTest 。 數(shù)據(jù)文件位于src / test / resources / data / inbound下,需要放置在與Partitioner入站目錄匹配的本地目錄中。 另外,請(qǐng)?jiān)L問(wèn)http://docs.spring.io/spring-batch/reference/html/scalability.html 。
翻譯自: https://www.javacodegeeks.com/2013/12/scaling-spring-batch-step-partitioning.html
總結(jié)
以上是生活随笔為你收集整理的扩展Spring Batch –步骤分区的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 肥东新楼盘备案价查询(肥东新楼盘备案价)
- 下一篇: 关闭Linux命令(关闭linux命令)