java批处理 异常处理_Java批处理教程
java批處理 異常處理
在當今世界,互聯網已經改變了我們的生活方式,其主要原因之一是大部分日常瑣事都使用互聯網。 這導致大量數據可用于處理。
其中涉及大量數據的一些示例是處理工資單,銀行對帳單,利息計算等。因此,想象一下,如果所有這些工作都必須手動完成,那么完成這些工作將花費很多時間。
在目前的年齡如何? 答案是批處理。
1.簡介
批處理是對批量數據執行的,無需人工干預,并且可以長時間運行。 它可能是數據或計算密集型的。 批處理作業可以按預定義的時間表運行,也可以按需啟動。 此外,由于批處理作業通常是長時間運行的作業,因此在批處理作業中發現經常檢查和從特定故障中重新啟動是常見的功能。
1.1 Java批處理的歷史
Java平臺的批處理是作為JSR 352規范(Java EE 7平臺的一部分)引入的,它定義了批處理應用程序的編程模型以及用于運行和管理批處理作業的運行時。
1.2 Java Batch的體系結構
下圖顯示了批處理的基本組件。
Java批處理的體系結構
批處理應用程序的體系結構解決了批處理問題,例如作業,步驟,存儲庫,讀取器處理器編寫器模式,塊,檢查點,并行處理,流,重試,排序,分區等。
讓我們了解架構的流程。
- 作業存儲庫包含需要運行的作業。
- JobLauncher從Job存儲庫中提取一個作業。
- 每個工作都包含步驟。 這些步驟是ItemReader , ItemProcessor和ItemWriter 。
- Item Reader是讀取數據的工具。
- 項目處理是一種將基于業務邏輯處理數據的處理。
- 條目編寫器會將數據寫回到定義的源。
1.3批處理組件。
現在,我們將嘗試詳細了解批處理組件。
- 作業:作業包含整個批處理過程。 它包含一個或多個步驟。 使用作業指定語言(JSL)將作業放在一起,該語言指定必須執行步驟的順序。 在JSR 352中,JSL在稱為作業XML文件的XML文件中指定。 一項工作基本上就是一個存放步驟的容器。
- 步驟:步驟是一個域對象,它包含作業的一個獨立的順序階段。 步驟包含執行實際處理所需的所有必要邏輯和數據。 根據批處理規范,步驟的定義含糊不清,因為步驟的內容純粹是特定于應用程序的,并且可以像開發人員所希望的那樣復雜或簡單。 有兩種步驟: 面向塊和面向任務 。
- 作業操作員:它提供了一個界面來管理作業處理的各個方面,其中包括操作命令,例如開始,重新啟動和停止,以及作業存儲庫命令,例如檢索作業和步驟執行。
- 作業存儲庫:它包含有關當前正在運行的作業的信息以及有關該作業的歷史數據。 JobOperator提供用于訪問此存儲庫的API。 JobRepository可以使用數據庫或文件系統來實現。
下一節將幫助您了解批處理體系結構的一些常見特征。
1.3工作步驟
步驟是作業的獨立階段。 如上所述,作業中有兩種類型的步驟。 我們將在下面嘗試詳細了解這兩種類型。
1.3.1面向塊的步驟
塊狀步驟將一次讀取和處理一項,并將結果分組。 然后,當塊達到預定義的大小時,將結果存儲起來。 當數據集很大時,面向塊的處理使存儲結果的效率更高。 它包括三個部分。
- 項目讀取器從一個數據源中依次讀取輸入,該數據源可以是數據庫,平面文件,日志文件等。
- 處理器將根據定義的業務邏輯一一處理數據。
- 編寫器將數據分塊寫入。 塊的大小是預定義的并且是可配置的
作為塊步驟的一部分,有一些檢查點可為框架提供信息以完成塊。 如果在塊處理期間發生錯誤,則可以基于最后一個檢查點重新開始該過程。
1.3.2面向任務的步驟
除了處理數據源中的項目外,它還執行任務。 其中包括創建或刪除目錄,移動文件,創建或刪除數據庫表等。與塊步驟相比,任務步驟通常不會長時間運行。
在正常情況下,在需要清理的面向塊的步驟之后使用面向任務的步驟。 例如,我們獲取日志文件作為應用程序的輸出。 塊步驟用于處理數據并從日志文件中獲取有意義的信息。
然后,使用任務步驟來清理不再需要的較舊的日志文件。
1.3.3并行處理
批處理作業通常執行昂貴的計算操作并處理大量數據。 批處理應用程序可以在兩種情況下受益于并行處理。
- 本質上獨立的步驟可以在不同的線程上運行。
- 面向塊的步驟(其中每個項目的處理獨立于處理先前項目的結果)可以在多個線程上運行。
批處理有助于完成任務并更快地執行操作以處理海量數據。
2.工具與技術
讓我們看看用于構建程序的技術和工具。
- Eclipse Oxygen.2發布(4.7.2)
- Java –版本9.0.4
- Gradle– 4.3
- Spring啟動– 2.0.1-發布
- HSQL數據庫
3.項目結構
項目結構如下圖所示。
Java Batch的項目結構
上面的項目結構使用Gradle。 也可以使用maven創建該項目,并且build.gralde將替換為pom.xml文件。 該項目的結構將稍微延遲使用Maven進行構建。
4.計劃的目標
作為程序的一部分,我們將嘗試使用spring boot創建一個簡單的Java批處理應用程序。 該應用程序將執行以下任務。
4.1 Gradle構建
我們正在使用Gradle作為程序的一部分進行構建。 build.gradle文件將如下所示。
build.gradle
buildscript {repositories {mavenCentral()}dependencies {classpath("org.springframework.boot:spring-boot-gradle-plugin:2.0.1.RELEASE")} }apply plugin: 'java' apply plugin: 'eclipse' apply plugin: 'idea' apply plugin: 'org.springframework.boot' apply plugin: 'io.spring.dependency-management'bootJar {baseName = 'java-batch'version = '1.0' }repositories {mavenCentral() }sourceCompatibility = 1.8 targetCompatibility = 1.8dependencies {compile("org.springframework.boot:spring-boot-starter-batch")compile("org.hsqldb:hsqldb")testCompile("junit:junit") } 在上面的build.gradle文件中, apply plugin: 'java'告訴我們需要設置的插件。 對我們來說,它是Java插件。
repositories{}讓我們知道應該從中提取依賴關系的存儲庫。 我們選擇了mavenCentral拉依賴罐。 我們還可以使用jcenter提取相應的依賴罐。
dependencies {}標簽用于提供應為項目提取的必要的jar文件詳細信息。 apply plugin: 'org.springframework.boot'此插件用于指定spring-boot項目。 boot jar{}將指定將從構建生成的jar的屬性。
4.2樣本數據文件
為了提供讀取階段的數據,我們將使用包含員工數據的CSV文件。
該文件將如下所示。
樣本CSV文件
John,Foster Joe,Toy Justin,Taylor Jane,Clark John,Steve示例數據文件包含員工的名字和姓氏。 我們將使用相同的數據進行處理,然后將其插入數據庫。
4.3 SQL腳本
我們正在使用HSQL數據庫,它是基于內存的數據庫。 該腳本將如下所示。
SQL腳本
DROP TABLE employee IF EXISTS;CREATE TABLE employee (person_id BIGINT IDENTITY NOT NULL PRIMARY KEY,first_name VARCHAR(20),last_name VARCHAR(20) );Spring Boot啟動時會自動運行schema-@@platform@@.sql 。 -all是所有平臺的默認設置。 因此,表創建將在應用程序啟動時自行進行,并且在應用程序啟動并運行之前可用。
4.4模型類別
我們將創建一個Employee.java類作為模型類。 該類如下所示。
程序的模型類
package com.batch;public class Employee {private String lastName;private String firstName;public Employee() {}public Employee(String firstName, String lastName) {this.firstName = firstName;this.lastName = lastName;}public void setFirstName(String firstName) {this.firstName = firstName;}public String getFirstName() {return firstName;}public String getLastName() {return lastName;}public void setLastName(String lastName) {this.lastName = lastName;}@Overridepublic String toString() {return "firstName: " + firstName + ", lastName: " + lastName;}}@Override用于覆蓋toString()方法的默認實現。
4.5配置類
我們將創建一個BatchConfiguration.java類,它將作為批處理的配置類。 Java文件如下所示。
BatchConfiguration.java
package com.batch.config;import javax.sql.DataSource;import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecutionListener; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider; import org.springframework.batch.item.database.JdbcBatchItemWriter; import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder; import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import org.springframework.jdbc.core.JdbcTemplate;import com.batch.Employee; import com.batch.processor.EmployeeItemProcessor;@Configuration @EnableBatchProcessing public class BatchConfiguration {@Autowiredpublic JobBuilderFactory jobBuilderFactory;@Autowiredpublic StepBuilderFactory stepBuilderFactory;// tag::readerwriterprocessor[]@Beanpublic FlatFileItemReader reader() {return new FlatFileItemReaderBuilder().name("EmployeeItemReader").resource(new ClassPathResource("sample-data.csv")).delimited().names(new String[]{"firstName", "lastName"}).fieldSetMapper(new BeanWrapperFieldSetMapper() {{setTargetType(Employee.class);}}).build();}@Beanpublic EmployeeItemProcessor processor() {return new EmployeeItemProcessor();}@Beanpublic JdbcBatchItemWriter writer(DataSource dataSource) {return new JdbcBatchItemWriterBuilder().itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()).sql("INSERT INTO employee (first_name, last_name) VALUES (:firstName, :lastName)").dataSource(dataSource).build();}// end::readerwriterprocessor[]// tag::jobstep[]@Beanpublic Job importUserJob(JobCompletionNotificationListener listener, Step step1) {return jobBuilderFactory.get("importUserJob").incrementer(new RunIdIncrementer()).listener(listener).flow(step1).end().build();}@Beanpublic Step step1(JdbcBatchItemWriter writer) {return stepBuilderFactory.get("step1").<Employee, Employee> chunk(10).reader(reader()).processor(processor()).writer(writer).build();}// end::jobstep[] } @EnableBatchProcessing批注用于啟用批處理。
JobBuilderFactory是用于構建作業的工廠。
StepBuilderFactory用于創建步驟。 方法step1()具有屬性chunk() 。 這是用于將輸入分塊為定義大小的屬性。 對于我們來說,大小為10。
4.6項目處理器
項目處理器是一個接口,將負責處理數據。 我們將在EmployeeItemProcessor.java實現該接口。 Java類如下所示。
EmployeeItemProcessor.java
package com.batch.processor;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.item.ItemProcessor;import com.batch.Employee;public class EmployeeItemProcessor implements ItemProcessor<Employee, Employee> {private static final Logger log = LoggerFactory.getLogger(EmployeeItemProcessor.class);@Overridepublic Employee process(Employee emp) throws Exception {final String firstName = emp.getFirstName().toUpperCase();final String lastName = emp.getLastName().toUpperCase();final Employee transformedEmployee = new Employee(firstName, lastName);log.info("Converting (" + emp + ") into (" + transformedEmployee + ")");return transformedEmployee;}}在process()方法中,我們將獲取數據,并將其轉換為大寫名稱。
4.7 JobExecutionSupportListener類
JobExecutionListenerSupport是將在作業完成時通知的接口。 作為接口的一部分,我們有afterJob方法。 此方法用于過帳作業的完成。
JobCompletionNotificationListener.java
package com.batch.config; import java.util.List;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.listener.JobExecutionListenerSupport; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.RowMapper; import org.springframework.stereotype.Component;import com.batch.Employee;@Component public class JobCompletionNotificationListener extends JobExecutionListenerSupport {private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);private final JdbcTemplate jdbcTemplate;@Autowiredpublic JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {this.jdbcTemplate = jdbcTemplate;}@Overridepublic void afterJob(JobExecution jobExecution) {RowMapper rowMapper = (rs, rowNum) -> {Employee e = new Employee();e.setFirstName(rs.getString(1));e.setLastName(rs.getString(2));return e;};if(jobExecution.getStatus() == BatchStatus.COMPLETED) {log.info("!!! JOB FINISHED! Time to verify the results");List empList= jdbcTemplate.query("SELECT first_name, last_name FROM employee",rowMapper);log.info("Size of List "+empList.size());for (Employee emp: empList) {log.info("Found: "+emp.getFirstName()+" "+emp.getLastName());}}} }在這種方法中,我們是在作業完成后從數據庫中獲取數據的,并將結果打印在控制臺上以驗證對數據執行的處理。
4.8應用類別
我們將創建一個應用程序類,其中包含負責觸發Java批處理程序的main方法。 該類如下所示。
應用程序
package com.batch; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class Application {public static void main(String[] args) throws Exception {SpringApplication.run(Application.class, args);} }@SpringBootApplication是用于將程序指定為Spring Boot程序的注釋。
5.輸出
讓我們將該應用程序作為Java應用程序執行。 我們將在控制臺上獲得以下輸出。
JavaBatch程序的輸出
批處理程序的工作流在輸出中非常清楚。 Job以importUserJob ,然后第一步執行開始,將讀取的數據轉換為大寫。
步驟的后處理,我們可以在控制臺上看到大寫的結果。
6.總結
在本教程中,我們學習了以下內容:
7.下載Eclipse項目
這是帶有SpringBoot的JavaBatch教程。
您可以在此處下載此示例的完整源代碼: JavaBatch.zip翻譯自: https://www.javacodegeeks.com/2018/05/java-batch-tutorial.html
java批處理 異常處理
總結
以上是生活随笔為你收集整理的java批处理 异常处理_Java批处理教程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 找到英语怎么读 找到英语如何读
- 下一篇: 舟加可念什么 舟加可念啥