jclouds_使用jclouds在S3上分段上传
jclouds
1.目標(biāo)
在上一篇文章中 ,我們研究了如何使用jclouds中的通用Blob API將內(nèi)容上傳到S3。 在本文中,我們將使用jclouds的S3特定的異步API上傳內(nèi)容并利用S3提供的分段上傳功能。
2.準(zhǔn)備
2.1。 設(shè)置自定義API
上傳過(guò)程的第一部分是創(chuàng)建jclouds API-這是針對(duì)Amazon S3的自定義API:
public AWSS3AsyncClient s3AsyncClient() {String identity = ...String credentials = ...BlobStoreContext context = ContextBuilder.newBuilder('aws-s3').credentials(identity, credentials).buildView(BlobStoreContext.class);RestContext<AWSS3Client, AWSS3AsyncClient> providerContext = context.unwrap();return providerContext.getAsyncApi(); }2.2。 確定內(nèi)容的零件數(shù)
Amazon S3對(duì)于每個(gè)要上傳的部分都有5 MB的限制。 因此,我們需要做的第一件事是確定可以分割內(nèi)容的適當(dāng)數(shù)量的部分,以使我們沒(méi)有低于5 MB限制的部分:
public static int getMaximumNumberOfParts(byte[] byteArray) {int numberOfParts= byteArray.length / fiveMB; // 5*1024*1024if (numberOfParts== 0) {return 1;}return numberOfParts; }2.3。 將內(nèi)容分成幾部分
將把字節(jié)數(shù)組分成一定數(shù)量的部分:
public static List<byte[]> breakByteArrayIntoParts(byte[] byteArray, int maxNumberOfParts) {List<byte[]> parts = Lists.<byte[]> newArrayListWithCapacity(maxNumberOfParts);int fullSize = byteArray.length;long dimensionOfPart = fullSize / maxNumberOfParts;for (int i = 0; i < maxNumberOfParts; i++) {int previousSplitPoint = (int) (dimensionOfPart * i);int splitPoint = (int) (dimensionOfPart * (i + 1));if (i == (maxNumberOfParts - 1)) {splitPoint = fullSize;}byte[] partBytes = Arrays.copyOfRange(byteArray, previousSplitPoint, splitPoint);parts.add(partBytes);}return parts; }我們將測(cè)試將字節(jié)數(shù)組分成多個(gè)部分的邏輯–我們將生成一些字節(jié),將字節(jié)數(shù)組拆分,使用Guava將其重新組合在一起,并驗(yàn)證是否可以恢復(fù)原始字節(jié):
@Test public void given16MByteArray_whenFileBytesAreSplitInto3_thenTheSplitIsCorrect() {byte[] byteArray = randomByteData(16);int maximumNumberOfParts = S3Util.getMaximumNumberOfParts(byteArray);List<byte[]> fileParts = S3Util.breakByteArrayIntoParts(byteArray, maximumNumberOfParts);assertThat(fileParts.get(0).length + fileParts.get(1).length + fileParts.get(2).length,equalTo(byteArray.length));byte[] unmultiplexed = Bytes.concat(fileParts.get(0), fileParts.get(1), fileParts.get(2));assertThat(byteArray, equalTo(unmultiplexed)); }要生成數(shù)據(jù),我們只需使用Random的支持:
byte[] randomByteData(int mb) {byte[] randomBytes = new byte[mb * 1024 * 1024];new Random().nextBytes(randomBytes);return randomBytes; }2.4。 創(chuàng)建有效載荷
既然我們已經(jīng)為內(nèi)容確定了正確的部分?jǐn)?shù)量,并且設(shè)法將內(nèi)容分解為多個(gè)部分,那么我們需要為jclouds API 生成Payload對(duì)象 :
public static List<Payload> createPayloadsOutOfParts(Iterable<byte[]> fileParts) {List<Payload> payloads = Lists.newArrayList();for (byte[] filePart : fileParts) {byte[] partMd5Bytes = Hashing.md5().hashBytes(filePart).asBytes();Payload partPayload = Payloads.newByteArrayPayload(filePart);partPayload.getContentMetadata().setContentLength((long) filePart.length);partPayload.getContentMetadata().setContentMD5(partMd5Bytes);payloads.add(partPayload);}return payloads; }3.上載
上傳過(guò)程是一個(gè)靈活的多步驟過(guò)程-這意味著:
- 可以在擁有所有數(shù)據(jù)之前開(kāi)始上傳-數(shù)據(jù)可以在輸入時(shí)上傳
- 數(shù)據(jù)分塊上傳-如果這些操作之一失敗,則可以簡(jiǎn)單地將其檢索
- 塊可以并行上傳–這可以大大提高上傳速度,尤其是在大文件的情況下
3.1。 啟動(dòng)上傳操作
Upload操作的第一步是啟動(dòng)該過(guò)程 。 對(duì)S3的請(qǐng)求必須包含標(biāo)準(zhǔn)的HTTP標(biāo)頭–特別是內(nèi)容 – MD5標(biāo)頭。 我們將在這里使用Guava哈希函數(shù)支持:
Hashing.md5().hashBytes(byteArray).asBytes();這是整個(gè)字節(jié)數(shù)組(而不是各個(gè)部分)的md5哈希 。
為了啟動(dòng)上載以及與S3的所有進(jìn)一步交互,我們將使用AWSS3AsyncClient –我們之前創(chuàng)建的異步API:
ObjectMetadata metadata = ObjectMetadataBuilder.create().key(key).contentMD5(md5Bytes).build(); String uploadId = s3AsyncApi.initiateMultipartUpload(container, metadata).get();密鑰是分配給對(duì)象的句柄–它必須是客戶端指定的唯一標(biāo)識(shí)符。
還要注意,即使我們使用的是異步版本的API, 我們也阻止了該操作的結(jié)果–這是因?yàn)槲覀冃枰跏蓟慕Y(jié)果才能繼續(xù)前進(jìn)。
操作的結(jié)果是S3返回的上載ID –這將在整個(gè)生命周期中識(shí)別上載,并將出現(xiàn)在所有后續(xù)的上載操作中。
3.2。 上載零件
下一步是上傳零件 。 我們的目標(biāo)是并行發(fā)送這些請(qǐng)求,因?yàn)樯陷d零件操作代表了大部分上載過(guò)程:
List<ListenableFuture<String>> ongoingOperations = Lists.newArrayList(); for (int partNumber = 0; partNumber < filePartsAsByteArrays.size(); partNumber++) {ListenableFuture<String> future = s3AsyncApi.uploadPart(container, key, partNumber + 1, uploadId, payloads.get(partNumber));ongoingOperations.add(future); }零件編號(hào)必須是連續(xù)的,但發(fā)送請(qǐng)求的順序無(wú)關(guān)緊要。
提交所有上載零件請(qǐng)求后,我們需要等待它們的響應(yīng),以便我們可以收集每個(gè)零件的單獨(dú)ETag值:
Function<ListenableFuture<String>, String> getEtagFromOp =new Function<ListenableFuture<String>, String>() {public String apply(ListenableFuture<String> ongoingOperation) {try {return ongoingOperation.get();} catch (InterruptedException | ExecutionException e) {throw new IllegalStateException(e);}} }; List<String> etagsOfParts = Lists.transform(ongoingOperations, getEtagFromOp);如果由于某種原因,上載部分操作之一失敗, 則可以重試該操作,直到成功為止。 上面的邏輯不包含重試機(jī)制,但是建立它應(yīng)該足夠簡(jiǎn)單。
3.3。 完成上傳操作
上傳過(guò)程的最后一步是完成分段操作 。 S3 API要求以Map的形式上傳來(lái)自先前零件的響應(yīng),現(xiàn)在我們可以從上面獲得的ETag列表中輕松創(chuàng)建這些響應(yīng):
Map<Integer, String> parts = Maps.newHashMap(); for (int i = 0; i < etagsOfParts.size(); i++) {parts.put(i + 1, etagsOfParts.get(i)); }最后,發(fā)送完整的請(qǐng)求:
s3AsyncApi.completeMultipartUpload(container, key, uploadId, parts).get();這將返回完成對(duì)象的最終ETag,并完成整個(gè)上傳過(guò)程。
4。結(jié)論
在本文中,我們使用自定義S3 jclouds API構(gòu)建了一個(gè)支持多部分的,完全并行的S3上傳操作。 此操作可以按原樣使用,但是可以通過(guò)幾種方法進(jìn)行改進(jìn) 。 首先,應(yīng)在上傳操作周圍添加重試邏輯,以更好地處理失敗。 接下來(lái),對(duì)于非常大的文件,即使該機(jī)制并行發(fā)送所有上載的多部分請(qǐng)求, 限制機(jī)制仍應(yīng)限制發(fā)送的并行請(qǐng)求的數(shù)量。 這既可以避免帶寬成為瓶頸,又可以確保Amazon本身不會(huì)將上傳過(guò)程標(biāo)記為超過(guò)每秒允許的請(qǐng)求限制– Guava RateLimiter可能非常適合此操作。
翻譯自: https://www.javacodegeeks.com/2013/04/multipart-upload-on-s3-with-jclouds.html
jclouds
總結(jié)
以上是生活随笔為你收集整理的jclouds_使用jclouds在S3上分段上传的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 空你几哇什么意思 空你几哇意思是什么
- 下一篇: 使用虚拟时间测试基于时间的反应堆堆芯流