netty冲突 play sbt_netty4 实现一个断点上传大文件功能
我本來以為文件斷點續傳功能很簡單,不就是提供2個方法:
一個返回已經上傳的文件的長度;另外一個負責上傳文件唄(請求帶上content-range 指明本次上傳的內容在整個文件中的位置),然后根據請求提供的位置寫唄,太簡單了。
但是實際情況還是比較復雜的,關鍵問題是,上面的描述現在想想只能稱作為文件分段上傳,而不是斷點續傳。
斷點意味著網絡會斷,然后斷了之后,服務端根本獲取不到本次上傳的內容,于是下次又只能從頭開始傳文件。一種解決辦法是客戶端將文件分成很小的片段(單個片段丟了就整個片段重傳),這個方案要求客戶端做很多工作,服務端還得根據片段的編號組織文件,總之客戶端和服務端都挺麻煩。
于是就想到用netty在寫一個服務filestoreApdapterServer,文件上傳提交給這個代理服務。這個做法有個前提就是,客戶端上傳的文件名稱保證唯一,并且在請求頭里面帶著這個名字,以便服務端定位文件。利用的原理是一般長度比較大的消息體,netty會使用chunk傳輸,我們取得chunk寫入臨時文件,這樣即使網絡斷了,服務端已經獲取的文件內容還是保留在臨時文件里面。
流程如下:
1. filestoreApdapterServer將請求的消息體寫到臨時文件(網絡斷了也不要緊,讀到多少寫多少)。
2. 客戶端下次傳之前先調用getSize獲取上傳傳遞的文件長度,我們就在這個getSize方法里面偷偷的將第一步保存的臨時文件追加到正式文件里面,然后返回文件長度。
3. 客戶端根據獲取的服務端文件長度,定位未傳的文件位置,讀取上傳。重復1,2步驟。直到文件上傳完成。
看代碼:FilestoreAdaptorServerInitializer
public?class?FilestoreAdaptorServerInitializer?extends
ChannelInitializer?{
@Override
protected?void?initChannel(SocketChannel?ch)?throws?Exception?{
ChannelPipeline?pipeline?=?ch.pipeline();
pipeline.addLast("decoder",?new?HttpRequestDecoder());
pipeline.addLast("aggregator",?new?StreamChunkAggregator(-1));
pipeline.addLast("encoder",?new?HttpResponseEncoder());
pipeline.addLast("handler",?new?FileUploadAdaptorHandler());
}
}
StreamChunkAggregator就是獲取上傳文件,寫臨時文件的:
public?class?StreamChunkAggregator?extends?MessageToMessageDecoder?{
private?static?final?Logger?log?=?LoggerFactory.getLogger(StreamChunkAggregator.class);
private?volatile?FullHttpMessage?currentMessage;
private?volatile?OutputStream?out;
private?final?int?maxContentLength;
private?volatile?File?file;
private?ChannelHandlerContext?ctx;
public?static?final?int?DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS?=?1024;
private?int?maxCumulationBufferComponents?=?DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS;
/**
*?Creates?a?new?instance.
*/
public?StreamChunkAggregator(int?maxContentLength)?{
this.maxContentLength?=?maxContentLength;
}
@Override
protected?void?decode(ChannelHandlerContext?ctx,?HttpObject?msg,
List?out)?throws?Exception?{
FullHttpMessage?currentMessage?=?this.currentMessage;
if?(msg?instanceof?HttpMessage)?{
HttpMessage?m?=?(HttpMessage)?msg;
if?(msg?instanceof?HttpRequest)?{
HttpRequest?header?=?(HttpRequest)?msg;
this.currentMessage?=?currentMessage?=?new?DefaultFullHttpRequest(header.getProtocolVersion(),
header.getMethod(),?header.getUri(),?Unpooled.compositeBuffer(maxCumulationBufferComponents));
final?String?localName?=?m.headers().get("file");?//?取上傳文件名
log.debug("upload?file?name?is?{}",?localName);
if(null?==?localName?||?"".equals(localName.trim()))?{
ctx.fireChannelRead(m);
}
File?dir?=?new?File(ServerHelper.getDestDir().getAbsolutePath()?+?File.separator?+?ServerHelper.getStorePath(localName));
if(!dir.exists())
dir.mkdirs();
log.debug("upload?file?path?is?{}",?dir.getAbsolutePath());
File?tempFile?=?new?File(dir,?localName?+?".utmp");
if(tempFile.exists())?{?//?文件已經存在可能是上次上傳遺留的
tempFile.delete();
}
this.file?=?tempFile;
this.out?=?new?FileOutputStream(file,?true);
}?else?{
throw?new?Error();
}
currentMessage.headers().set(m.headers());
}?else?if?(msg?instanceof?HttpContent)?{
assert?currentMessage?!=?null;
HttpContent?chunk?=?(HttpContent)?msg;
if?(chunk.content().isReadable())?{
chunk.retain();
IOUtils.copyLarge(new?ByteBufInputStream(chunk.content()),?this.out);
}
final?boolean?last;
if?(!chunk.getDecoderResult().isSuccess())?{
currentMessage.setDecoderResult(
DecoderResult.failure(chunk.getDecoderResult().cause()));
last?=?true;
}?else?{
last?=?chunk?instanceof?LastHttpContent;
}
if?(last)?{
this.out.flush();
this.out.close();
this.out?=?null;
this.currentMessage?=?null;
this.file?=?null;
out.add(currentMessage);
}
}?else?{
throw?new?Error();
}
}
FileUploadAdaptorHandler 這個是最后傳成功后通知真正的服務端,并且獲取服務的返回,給客戶端:
public?class?FileUploadAdaptorHandler?extends?SimpleChannelInboundHandler?{
private?static?final?Logger?log?=?LoggerFactory.getLogger(FileUploadAdaptorHandler.class);
@Override
protected?void?channelRead0(final?ChannelHandlerContext?ctx,?DefaultFullHttpRequest?msg)?throws?Exception?{
if(log.isDebugEnabled())?{
log.debug("message?received:?begin");
}
final?String?filename?=?msg.headers().get("file");
if(filename?==?null?||?"".equals(filename.trim()))?{?//沒有文件名?直接返回4001?參數錯誤
String?responseBody?=?"{\"result_code\":?4001,\"result_msg\":?\"請求參數錯誤\"}";
response(responseBody.getBytes(),?HttpResponseStatus.BAD_REQUEST,?ctx);
}?else?{
//?轉發給play服務處理
final?CloseableHttpAsyncClient?httpclient?=?HttpAsyncClients.createDefault();
httpclient.start();
try?{
HttpGet?request1?=?new?HttpGet(ServerHelper.getPlayServer());
request1.setHeader("Client-Session",?msg.headers().get("client-session"));
request1.setHeader("Content-Range",?msg.headers().get("content-range"));
request1.setHeader("file",?msg.headers().get("file"));
httpclient.start();
httpclient.execute(request1,?new?FutureCallback()?{
@Override
public?void?failed(Exception?e)?{
try?{
httpclient.close();
}?catch?(IOException?e1)?{
log.error(e1.getMessage(),?e1);
}
serve500(ctx,?filename);
}
@Override
public?void?completed(org.apache.http.HttpResponse?playResonse)?{
log.debug("HttpAsyncClient?callback");
int?status?=?playResonse.getStatusLine().getStatusCode();
log.debug("HttpAsyncClient?callback?playResonse?status?is?{}",?status);
if(status?!=?200)?{
ServerHelper.deleteTmpFile(filename);
}
HttpEntity?entity?=?playResonse.getEntity();
byte[]?bytes?=?new?byte[(int)?entity.getContentLength()];
try?{
IOUtils.read(entity.getContent(),?bytes);
response(bytes,?new?HttpResponseStatus(status,?""),?ctx);
}?catch?(Exception?e)?{
log.error(e.getMessage(),?e);
serve500(ctx,?filename);
}?finally?{
try?{
httpclient.close();
}?catch?(IOException?e1)?{
log.error(e1.getMessage(),?e1);
}
}
}
@Override
public?void?cancelled()?{
try?{
httpclient.close();
}?catch?(IOException?e1)?{
log.error(e1.getMessage(),?e1);
}
serve500(ctx,?filename);
}
});
}?catch?(Exception?e)?{
httpclient.close();
log.error(e.getMessage(),?e);
serve500(ctx,?filename);
}
}
if(log.isDebugEnabled())?{
log.debug("message?received:?end");
}
}
真正服務提供2個方法,一個是獲取長度,一個是接收filestoreAapterServer請求的方法:
public?static?void?getFileLength(String?name)?{
Logger.debug("getFileLength?path?is?"?+?FileHelper.getStorgePath(name));
File?file?=?new?File(FileHelper.getStorgePath(name));
long?length?=?file.length();
response.status?=?StatusCode.OK;
response.setHeader("Content-Size",?String.valueOf(length));
LocalFile?file?=?LocalFile?.find(。。。).first();
if(file?!=?null){?//?如果數據中有記錄則認為文件已經保存完整
Logger.debug("getFileLength?file?has?been?in?database");
FileResult?result?=?new?FileResult();
。。。
throw?new?CustomJsonResult(result);
}
File?fileTmp?=?new?File(FileHelper.getStorgePath(name)?+?FileHelper.TMP_SUFFIX);
if(Logger.isDebugEnabled())
Logger.debug("getFileLength?temp?path?is?"?+?fileTmp.getAbsolutePath()?+?",?existed?is:?"?+?fileTmp.exists());
if(fileTmp.exists())?{
//?臨時文件存在,則保存臨時文件
Logger.debug("getFileLength?save?tmp?file");
try?{
FileHelper.saveFileFromTmp(fileTmp,?file);
}?catch?(IOException?ingore)?{
Logger.error(ingore.getMessage(),?ingore);
}
length?=?file.length();
}
response.setHeader("Content-Size",?String.valueOf(length));
}
public?static?void?saveUploadFile()?{
String?filename?=?getFileName();
Logger.debug("saveUploadFile?name?is?%s",?filename);
long?total?=?getFileTotal();?//?整個文件的大小
File?tempFile?=?new?File(FileHelper.getStorgePath(filename)?+?FileHelper.TMP_SUFFIX);
if(Logger.isDebugEnabled())?{
Logger.debug("saveUploadFile?upload?tmp?file?is:?"?+?tempFile.getAbsolutePath());
}
if(!tempFile.exists())?{
ApiResult?result?=?new?ApiResult();
result.resultCode?=?ApiResultCode.UPLOAD_FILE_FAIL;
response.status?=?Http.StatusCode.INTERNAL_ERROR;
throw?new?CustomJsonResult(result);
}
File?destFile?=?new?File(FileHelper.getStorgePath(filename));
if(destFile.length()?>=?total)?{
//?已經上傳成功了?需要刪除臨時文件
FileUtils.deleteQuietly(tempFile);
if(Logger.isDebugEnabled())?{
Logger.debug("saveUploadFile?video?has?upload?completely");
}
//?已經完整了,如果數據庫不存在保存數據庫
....
FileResult?result?=?new?FileResult();
result.resultCode?=?ApiResultCode.SUCCESS;
result.videoUrl?=?video.videoUrl;
result.shortUrl?=?video.shortUrl;
throw?new?CustomJsonResult(result);
}
try?{
FileHelper.saveFileFromTmp(tempFile,?destFile);
}?catch?(IOException?e)?{
Logger.error("saveUploadFile?"?+?e.getMessage(),?e);
ApiResult?result?=?new?ApiResult();
result.resultCode?=?ApiResultCode.UPLOAD_FILE_FAIL;
response.status?=?Http.StatusCode.INTERNAL_ERROR;
throw?new?CustomJsonResult(result);
}
afterWrite(filename,?destFile,?total);?//一些后續工作,如果文件保存完整,保存數據庫返回成功結果給客戶端
}
這個解決方法,和我們的服務綁定的比較緊,不能解決較為通用的問題 只是提出一種思路。
總結
以上是生活随笔為你收集整理的netty冲突 play sbt_netty4 实现一个断点上传大文件功能的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: nexus 代理阿里云_Azure容器镜
- 下一篇: antd 设置表头属性_解决react使