如何以大数据的JAX-RS响应的形式将JPA结果流化/序列化
有時,有必要通過JPA檢索大型數據集(例如,超過1,000,000條記錄),并將它們填充到java.util.List的單個實例中是有風險的(內存障礙)。 因此,這是一個快速的解決方案,它可以解決JAX-RS REST資源端點如何仍能及時為我們提供響應,而又不會通過“頁面”對JPA實體進行流式處理或序列化來打破內存限制的情況。
示例數據庫表和JPA實體
數據庫表
為了演示如何實現大數據的輸出,這是我們可以使用的示例MySQL數據庫表。
create database large_data_test_db; use large_data_test_db;create table generated_uuids (record_no bigint not null auto_increment,uuid varchar(100) not null,datetime_generated datetime not null,primary key(record_no),unique(uuid) );JPA實體
接下來,定義代表上述表結構的JPA實體類。
GeneratedUuidEntity.java的代碼
package com.developerscrappad;import java.io.Serializable; import java.util.Date; import javax.persistence.Column; import javax.persistence.Entity; import javax.persistence.GeneratedValue; import javax.persistence.GenerationType; import javax.persistence.Id; import javax.persistence.NamedQueries; import javax.persistence.NamedQuery; import javax.persistence.Table; import javax.persistence.Temporal; import javax.persistence.TemporalType;@Entity @Table( name = "generated_uuids" ) @NamedQueries( {@NamedQuery( name = "GeneratedUuidEntity.listAll", query = "SELECT u FROM GeneratedUuidEntity u" ),@NamedQuery( name = "GeneratedUuidEntity.queryRecordsSize", query = "SELECT count(u) FROM GeneratedUuidEntity u" ) } ) public class GeneratedUuidEntity implements Serializable {private static final long serialVersionUID = 12312312234234123L;@Id@GeneratedValue( strategy = GenerationType.IDENTITY )@Column( name = "record_no" )private Long recordNo;@Column( name = "uuid" )private String uuid;@Column( name = "datetime_generated" )@Temporal( TemporalType.TIMESTAMP )private Date datetimeGenerated;public GeneratedUuidEntity() {}public GeneratedUuidEntity( Long recordNo ) {this.recordNo = recordNo;}public GeneratedUuidEntity( Long recordNo, String uuid, Date datetimeGenerated ) {this.recordNo = recordNo;this.uuid = uuid;this.datetimeGenerated = datetimeGenerated;}public Long getRecordNo() {return recordNo;}public void setRecordNo( Long recordNo ) {this.recordNo = recordNo;}public String getUuid() {return uuid;}public void setUuid( String uuid ) {this.uuid = uuid;}public Date getDatetimeGenerated() {return datetimeGenerated;}public void setDatetimeGenerated( Date datetimeGenerated ) {this.datetimeGenerated = datetimeGenerated;}@Overridepublic int hashCode() {int hash = 0;hash += ( recordNo != null ? recordNo.hashCode() : 0 );return hash;}@Overridepublic boolean equals( Object object ) {// TODO: Warning - this method won't work in the case the id fields are not setif ( !( object instanceof GeneratedUuidEntity ) ) {return false;}GeneratedUuidEntity other = ( GeneratedUuidEntity ) object;if ( ( this.recordNo == null && other.recordNo != null ) || ( this.recordNo != null && !this.recordNo.equals( other.recordNo ) ) ) {return false;}return true;}@Overridepublic String toString() {return "com.developerscrappad.GeneratedUuidEntity[ recordNo=" + recordNo + " ]";} }在GeneratedUuidEntity中定義了兩個命名查詢。 GeneratedUuidEntity.queryRecordsSize用于查詢表的總記錄數,而GeneratedUuidEntity.listAll用于檢索表中的所有記錄。
實施JAX-RS REST資源(Java EE方式)
讓我們有一個名稱為JPAStreamingRESTResource的JAX-RS REST資源類,其中有一個可用的JPA EntityManager(持久性單元名稱: JPAStreamingPU )要注入并通過受保護的方法getEntityManager()獲得 。
@Path( "generated-uuids" ) @Stateless( name = "JPAStreamingRESTResource", mappedName = "ejb/JPAStreamingRESTResource" ) public class JPAStreamingRESTResource {@PersistenceContext( unitName = "JPAStreamingPU" )private EntityManager entityManager;protected EntityManager getEntityManager() {return entityManager;}/*** Say "NO" to response caching*/protected Response.ResponseBuilder getNoCacheResponseBuilder( Response.Status status ) {CacheControl cc = new CacheControl();cc.setNoCache( true );cc.setMaxAge( -1 );cc.setMustRevalidate( true );return Response.status( status ).cacheControl( cc );} }此外,我們有一個名為getNoCacheResponseBuilder()的方法,該方法用于獲取非緩存的javax.ws.rs.core.Response.ResponseBuilder ,這樣以后就不會再得到奇怪的緩存結果了。
JPA調用方法
接下來,讓我們在資源類中定義兩個方法,即:
queryGeneratedUuidRecordsSize() –檢索表中的記錄總數
private int queryGeneratedUuidRecordsSize() {return getEntityManager().createNamedQuery( "GeneratedUuidEntity.queryRecordsSize", Long.class ).getSingleResult().intValue(); }listAllGeneratedUuidEntities() –從表中檢索所有數據,但具有某些限制條件,例如記錄的起始位置(recordPosition)和每次往返數據庫的最大記錄數(recordsPerRoundTrip)。 目的是“分頁”結果,以使結果列表不會過分膨脹。 我們稍后會看到它的作用。
private List<GeneratedUuidEntity> listAllGeneratedUuidEntities( int recordPosition, int recordsPerRoundTrip ) {return getEntityManager().createNamedQuery( "GeneratedUuidEntity.listAll" ).setFirstResult( recordPosition ).setMaxResults( recordsPerRoundTrip ).getResultList(); }讓流開始
現在,讓我們實現資源端點方法,至少從理論上講,該方法可以在不損害大小的情況下檢索數據。 此方法將返回JSON響應,其數據格式為:
{"result": [{"record_no": 1,"uuid": "34d99089-3e36-4f00-ab93-846b61771eb3","datetime_generated": "2015-06-28 21:02:23"},…] }@GET@Path( "list-all" )@Produces( "application/json" )@TransactionAttribute( TransactionAttributeType.NEVER )public Response streamGeneratedUuids() {// Define the format of timestamp outputSimpleDateFormat df = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );return getNoCacheResponseBuilder( Response.Status.OK ).entity( new StreamingOutput() {// Instruct how StreamingOutput's write method is to stream the data@Overridepublic void write( OutputStream os ) throws IOException, WebApplicationException {int recordsPerRoundTrip = 100; // Number of records for every round trip to the databaseint recordPosition = 0; // Initial record position indexint recordSize = queryGeneratedUuidRecordsSize(); // Total records found for the query// Start streaming the datatry ( PrintWriter writer = new PrintWriter( new BufferedWriter( new OutputStreamWriter( os ) ) ) ) {writer.print( "{\"result\": [" );while ( recordSize > 0 ) {// Get the paged data set from the DBList<GeneratedUuidEntity> generatedUuidEntities = listAllGeneratedUuidEntities( recordPosition, recordsPerRoundTrip );for ( GeneratedUuidEntity generatedUuidEntity : generatedUuidEntities ) {if ( recordPosition > 0 ) {writer.print( "," );}// Stream the data in Json object formatwriter.print( Json.createObjectBuilder().add( "record_no", generatedUuidEntity.getRecordNo() ).add( "uuid", generatedUuidEntity.getUuid() ).add( "datetime_generated", df.format( generatedUuidEntity.getDatetimeGenerated() ) ).build().toString() );// Increase the recordPosition for every record streamedrecordPosition++;}// update the recordSize (remaining no. of records)recordSize -= recordsPerRoundTrip;}// Done!writer.print( "]}" );}}} ).build();}電源線說明:
實際上,這很簡單。 訣竅是通過重寫write()方法來定義匿名類StreamingOutput的表達式,該方法首先通過queryGeneratedUuidRecordsSize()查詢總記錄大小,然后通過listAllGeneratedUuidEntities()逐頁檢索記錄。 此方法將多次往返于數據庫,具體取決于定義的recordsPerRoundTrip值。
JPAStreamingRESTResource.java的完整源代碼:
package com.developerscrappad;import java.io.BufferedWriter; import java.io.IOException; import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.text.SimpleDateFormat; import java.util.List; import javax.ejb.Stateless; import javax.ejb.TransactionAttribute; import javax.ejb.TransactionAttributeType; import javax.json.Json; import javax.persistence.EntityManager; import javax.persistence.PersistenceContext; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.CacheControl; import javax.ws.rs.core.Response; import javax.ws.rs.core.StreamingOutput;@Path( "generated-uuids" ) @Stateless( name = "JPAStreamingRESTResource", mappedName = "ejb/JPAStreamingRESTResource" ) public class JPAStreamingRESTResource {@PersistenceContext( unitName = "JPAStreamingPU" )private EntityManager entityManager;private List<GeneratedUuidEntity> listAllGeneratedUuidEntities( int recordPosition, int recordsPerRoundTrip ) {return getEntityManager().createNamedQuery( "GeneratedUuidEntity.listAll" ).setFirstResult( recordPosition ).setMaxResults( recordsPerRoundTrip ).getResultList();}private int queryGeneratedUuidRecordsSize() {return getEntityManager().createNamedQuery( "GeneratedUuidEntity.queryRecordsSize", Long.class ).getSingleResult().intValue();}protected EntityManager getEntityManager() {return entityManager;}/*** Say "NO" to response caching*/protected Response.ResponseBuilder getNoCacheResponseBuilder( Response.Status status ) {CacheControl cc = new CacheControl();cc.setNoCache( true );cc.setMaxAge( -1 );cc.setMustRevalidate( true );return Response.status( status ).cacheControl( cc );}@GET@Path( "list-all" )@Produces( "application/json" )@TransactionAttribute( TransactionAttributeType.NEVER )public Response streamGeneratedUuids() {// Define the format of timestamp outputSimpleDateFormat df = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );return getNoCacheResponseBuilder( Response.Status.OK ).entity( new StreamingOutput() {// Instruct how StreamingOutput's write method is to stream the data@Overridepublic void write( OutputStream os ) throws IOException, WebApplicationException {int recordsPerRoundTrip = 100; // Number of records for every round trip to the databaseint recordPosition = 0; // Initial record position indexint recordSize = queryGeneratedUuidRecordsSize(); // Total records found for the query// Start streaming the datatry ( PrintWriter writer = new PrintWriter( new BufferedWriter( new OutputStreamWriter( os ) ) ) ) {writer.print( "{\"result\": [" );while ( recordSize > 0 ) {// Get the paged data set from the DBList<GeneratedUuidEntity> generatedUuidEntities = listAllGeneratedUuidEntities( recordPosition, recordsPerRoundTrip );for ( GeneratedUuidEntity generatedUuidEntity : generatedUuidEntities ) {if ( recordPosition > 0 ) {writer.print( "," );}// Stream the data in Json object formatwriter.print( Json.createObjectBuilder().add( "record_no", generatedUuidEntity.getRecordNo() ).add( "uuid", generatedUuidEntity.getUuid() ).add( "datetime_generated", df.format( generatedUuidEntity.getDatetimeGenerated() ) ).build().toString() );// Increase the recordPosition for every record streamedrecordPosition++;}// update the recordSize (remaining no. of records)recordSize -= recordsPerRoundTrip;}// Done!writer.print( "]}" );}}} ).build();} }小心
請記住要調整應用程序服務器的響應連接超時值,以防止REST或Http Client拋出java.io.IOException異常EOF異常。
測試它
要測試這是否有效,只需將表加載僅567條記錄即可。 然后,讓單元測試調用端點URL,并使用以下單元測試代碼將檢索到的JSON數據保存到文件中(使用Apache HttpClient):
JPAStreamingUnitTest.java的代碼:
package com.developerscrappad;import java.io.File; import java.io.FileInputStream; import static org.junit.Assert.*;import java.nio.file.FileSystems; import java.nio.file.Files; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.UUID; import javax.json.Json; import javax.json.JsonArray; import javax.json.JsonObject; import javax.json.JsonReader; import org.apache.http.HttpEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test;public class JPAStreamingUnitTest {private static final String dbDriverClassname = "com.mysql.jdbc.Driver";private static final String dbUrl = "jdbc:mysql://localhost:3306/large_data_test_db";private static final String username = "username";private static final String password = "password";private static final int numberOfRecords = 567;private static final String jsonResultOutputFilename = "testing123.json";@BeforeClasspublic static void setUpClass() {try {Class.forName( dbDriverClassname );try ( Connection conn = DriverManager.getConnection( dbUrl, username, password ) ) {String insertSQL = "insert into generated_uuids (uuid, datetime_generated) values (?, now())";try ( PreparedStatement stmt = conn.prepareStatement( insertSQL ) ) {for ( int i = 0; i < numberOfRecords; i++ ) {System.out.println( "Inserting row: " + i );stmt.setString( 1, UUID.randomUUID().toString() );stmt.executeUpdate();}}}} catch ( final Exception ex ) {ex.printStackTrace();fail( ex.getMessage() );}}@AfterClasspublic static void tearDownClass() {try {Class.forName( dbDriverClassname );try ( Connection conn = DriverManager.getConnection( dbUrl, username, password ) ) {String truncateSQL = "truncate generated_uuids";conn.createStatement().executeUpdate( truncateSQL );}new File( System.getProperty( "java.io.tmpdir" ), jsonResultOutputFilename ).delete();} catch ( final Exception ex ) {ex.printStackTrace();fail( ex.getMessage() );}}@Testpublic void testJPAStreaming() {String url = "http://localhost:8080/JPAStreaming/rest-api/generated-uuids/list-all/";try {CloseableHttpClient httpclient = HttpClients.createDefault();HttpGet httpGet = new HttpGet( url );try ( CloseableHttpResponse response1 = httpclient.execute( httpGet ) ) {System.out.println( response1.getStatusLine() );HttpEntity entity1 = response1.getEntity();Files.copy( entity1.getContent(), FileSystems.getDefault().getPath( System.getProperty( "java.io.tmpdir" ), jsonResultOutputFilename ) );}// Validatetry ( JsonReader jsonReader = Json.createReader( new FileInputStream( new File( System.getProperty( "java.io.tmpdir" ), jsonResultOutputFilename ) ) ) ) {JsonObject jsonObj = jsonReader.readObject();assertTrue( jsonObj.containsKey( "result" ) );JsonArray jsonArray = jsonObj.getJsonArray( "result" );assertEquals( numberOfRecords, jsonArray.size() );SimpleDateFormat validationDF = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );for ( int i = 0; i < jsonArray.size(); i++ ) {JsonObject generatedUuidJsonObj = jsonArray.getJsonObject( i );int recordNumber = generatedUuidJsonObj.getInt( "record_no" );assertTrue( recordNumber > 0 );try {UUID.fromString( generatedUuidJsonObj.getString( "uuid" ) );} catch ( IllegalArgumentException ex ) {fail( "Invalid UUID format at record number: " + recordNumber );}try {validationDF.parse( generatedUuidJsonObj.getString( "datetime_generated" ) );} catch ( final NullPointerException | ParseException ex ) {fail( "datetime_generated field must not be null and must be of format yyyy-MM-dd HH:mm:ss" );}}}} catch ( final Exception ex ) {ex.printStackTrace();fail( ex.getMessage() );}} }我們完成了。 感謝您的閱讀,希望對您有所幫助。
翻譯自: https://www.javacodegeeks.com/2015/07/how-to-streamserialize-jpa-result-as-jax-rs-response-for-large-data.html
總結
以上是生活随笔為你收集整理的如何以大数据的JAX-RS响应的形式将JPA结果流化/序列化的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 基于最简单的FFmpeg包封过程:视频和
- 下一篇: LaTeX中定义新命令和环境