Table Store: 海量結構化數據實時備份實戰
數據備份簡介
在信息技術與數據管理領域,備份是指將文件系統或數據庫系統中的數據加以復制,一旦發生災難或者錯誤操作時,得以方便而及時地恢復系統的有效數據和正常運作。在實際備份過程中,最好將重要數據制作三個或三個以上的備份,并且放置在不同的場所異地備援,以供日后回存之用。
備份有兩個不同的目的,其主要的目的是在數據丟失后恢復數據,無論數據是被刪除還是被損壞。備份的第二個目的是根據用戶定義的數據保留策略從較早的時間恢復數據,通常在備份應用程序中配置需要備份多長時間的數據副本。
由于備份系統至少會包含一個被認為值得保存的所有數據的副本,因此對數據存儲的要求可能會很高,組織此存儲空間和管理備份過程可能是一項復雜的任務。如今,有許多不同類型的數據存儲設備可用于進行備份,還可以通過許多不同的方式來安排這些設備以提供地理冗余,數據安全性和可移植性。
在將數據發送到其存儲位置之前,會選擇,提取和操作它們,目前已經有許多不同的技術來優化備份過程,其中包括處理打開的文件(open files)和實時數據源的優化,以及壓縮,加密和重復數據刪除等。每個備份方案都應包括演習過程,以驗證正在備份的數據的可靠性,更重要的是要認識到任何備份方案中涉及的限制和人為因素。
Table Store備份需求分析
對于存儲系統而言,數據的安全可靠永遠是第一位的,要保障數據盡可能高的可靠性,需要從兩個方面保障:
- 存儲系統本身的數據可靠性:表格存儲(Table Store)是阿里云自研的面向海量結構化數據存儲的Serverless NoSQL多模型數據庫,提供了99.9999999%的數據可靠性保證,在業界屬于非常非常高的標準了。
- 誤操作后能恢復數據:誤操作永遠都無法避免,要做的是當誤操作發生的時候能盡快恢復,那么就需要有備份數據存在。對于備份,有兩種方案,一個是部署同城或異地災備,這種代價高費用高,更多的用于社會基礎信息或金融信息。另一種是將數據備份到另一個價格更低廉的系統,當萬一出現誤操作的時候,可以有辦法恢復就行。一般可以選擇文件存儲系統,比如阿里云OSS。
Table Store備份恢復方案簡介
下圖為Table Store備份恢復的邏輯結構圖,基于全增量一體的通道服務我們可以很容易的構建一整套的數據備份和數據恢復方案,同時具備了實時增量備份能力和秒級別的恢復能力。只要提前配置好備份和恢復的計劃,整個備份恢復系統可以做到完全的自動化進行。
Table Store備份恢復方案實戰
目前表格存儲雖然未推出官方的備份和恢復功能,但是筆者會step-by-step的帶大家基于表格存儲的通道服務設計屬于自己的專屬備份恢復方案,實戰步驟會分為備份和恢復兩部分。
備份
- 預準備階段:需要確定待備份的數據源和備份的目的源,在此次的實戰中,分別對應著TableStore的表和OSS的Bucket。
- 確定備份計劃和策略
- 使用通道服務SDK編寫代碼
- 執行備份策略并監測備份過程
恢復
確定備份計劃和策略
備份策略需要確定備份內容、備份時間和備份方式,目前常見的備份策略主要有以下幾類。
- 全量備份(Full Backup): 把硬盤或數據庫內的所有文件、文件夾或數據進行一次性的復制。
- 增量備份(Incremental Backup): 對上一次的全量備份或增量備份后更新的數據進行的備份。
- 差異備份(Differential Backup): 提供運行全量備份后變更文件的備份。
- 選擇式備份:對系統數據的一部分進行的備份。
- 冷備份:系統處于停機或維護狀態下的備份。這種情況下,備份的數據與系統該時段的數據應該完全一致。
- 熱備份:系統處于正常運轉狀態下的備份。這種情況下,由于系統中的數據可能隨時在更新,備份的數據相對于系統的真實數據可能會有一定的滯后。
在此次的實戰中,筆者對于備份計劃和策略的選擇如下(這里可以根據自己的需求進行自定義設計,然后利用通道服務的SDK來完成相應的需求)。
- 備份內容:TableStore 數據表
-
備份時間:
- 全量備份定期執行(時間可調,默認為一周)。
- 增量備份根據配置定期執行,表格存儲的通道服務可以保障數據的嚴格有序性,每個增量文件是流式append的,可隨時被消費。
- 備份方式:全量備份+增量備份,熱備份。
使用通道服務SDK編寫代碼
這部分會結合代碼片段的形式講解,詳細的代碼后續會開源出去(鏈接會更新在本文中),盡請期待。在進行實戰之前,推薦先閱讀通道服務Java SDK的?使用文檔。
創建一個全量+增量類型的Tunnel,這里可以使用SDK或者官網控制臺進行創建。??
private static void createTunnel(TunnelClient client, String tunnelName) {CreateTunnelRequest request = new CreateTunnelRequest(TableName, tunnelName, TunnelType.BaseAndStream);CreateTunnelResponse resp = client.createTunnel(request);System.out.println("RequestId: " + resp.getRequestId());System.out.println("TunnelId: " + resp.getTunnelId());
}
了解通道服務自動化數據處理框架?
在通道服務的快速開始里,我們可以看到用戶只需要傳入處理數據的process函數和shutdown函數即可完成自動化的數據處理。在process函數的入參中會帶有List,而StreamRecord包裝的正是TableStore里每一行的數據,包括Record的類型,主鍵列,屬性列,用戶寫入Record的時間戳等。設計TableStore每一行數據的持久化格式?
本次實戰中我們使用CSV文件格式,當然你也可以用pb或者其它格式,CSV的優點是可讀性比較強,每一行數據會對應CSV文件的一行,持久化的格式如下圖所示,CSV文件會有四列, TimeStamp列是數據寫入TableStore的時間戳(全量時都為0,增量備份為具體的時間戳),RecordType是數據行的操作類型(PUT, UPDATE和DELETE),PrimaryKey為主鍵列的JSON字符串表示,RecordColumns為屬性列的JSON字符串表示。
轉換部分的核心代碼參見如下代碼片段,筆者處理這部分用的是univocity-parsers(CSV)和Gson庫,有幾個地方需要特別注意下:1). Gson反序列化Long類型會轉為Number類型,可能會造成進度丟失,有若干解決辦法,筆者采用的是將其轉為String類型序列化。2). 對于binary類型的數據的特殊處理,筆者進行了base64的編解碼。3). 可以直接流式寫入OSS中,減少本地持久化的消耗。
this.gson = new GsonBuilder().registerTypeHierarchyAdapter(byte[].class, new ByteArrayToBase64TypeAdapter()).setLongSerializationPolicy(LongSerializationPolicy.STRING).create();
// ByteArrayOutputStream到ByteArrayInputStream會有一次array.copy, 可考慮用管道或者NIO channel.
public void streamRecordsToOSS(List<StreamRecord> records, String bucketName, String filename, boolean isNewFile) {if (records.size() == 0) {LOG.info("No stream records, skip it!");return;}try {CsvWriterSettings settings = new CsvWriterSettings();ByteArrayOutputStream out = new ByteArrayOutputStream();CsvWriter writer = new CsvWriter(out, settings);if (isNewFile) {LOG.info("Write csv header, filename {}", filename);List<String> headers = Arrays.asList(RECORD_TIMESTAMP, RECORD_TYPE, PRIMARY_KEY, RECORD_COLUMNS);writer.writeHeaders(headers);System.out.println(writer.getRecordCount());}List<String[]> totalRows = new ArrayList<String[]>();LOG.info("Write stream records, num: {}", records.size());for (StreamRecord record : records) {String timestamp = String.valueOf(record.getSequenceInfo().getTimestamp());String recordType = record.getRecordType().name();String primaryKey = gson.toJson(TunnelPrimaryKeyColumn.genColumns(record.getPrimaryKey().getPrimaryKeyColumns()));String columns = gson.toJson(TunnelRecordColumn.genColumns(record.getColumns()));totalRows.add(new String[] {timestamp, recordType, primaryKey, columns});}writer.writeStringRowsAndClose(totalRows);// write to oss fileossClient.putObject(bucketName, filename, new ByteArrayInputStream(out.toByteArray()));} catch (Exception e) {e.printStackTrace();}
}
執行備份策略并監測備份過程
運行通道服務的自動化數據框架,掛載上一步中設計的備份策略代碼。
public class TunnelBackup {private final ConfigHelper config;private final SyncClient syncClient;private final CsvHelper csvHelper;private final OSSClient ossClient;public TunnelBackup(ConfigHelper config) {this.config = config;syncClient = new SyncClient(config.getEndpoint(), config.getAccessId(), config.getAccessKey(),config.getInstanceName());ossClient = new OSSClient(config.getOssEndpoint(), config.getAccessId(), config.getAccessKey());csvHelper = new CsvHelper(syncClient, ossClient);}public void working() {TunnelClient client = new TunnelClient(config.getEndpoint(), config.getAccessId(), config.getAccessKey(),config.getInstanceName());OtsReaderConfig readerConfig = new OtsReaderConfig();TunnelWorkerConfig workerConfig = new TunnelWorkerConfig(new OtsReaderProcessor(csvHelper, config.getOssBucket(), readerConfig));TunnelWorker worker = new TunnelWorker(config.getTunnelId(), client, workerConfig);try {worker.connectAndWorking();} catch (Exception e) {e.printStackTrace();worker.shutdown();client.shutdown();}}public static void main(String[] args) {TunnelBackup tunnelBackup = new TunnelBackup(new ConfigHelper());tunnelBackup.working();}
}
監測備份過程
備份過程的監控可以通過表格存儲控制臺或者?DescribeTunnel?接口完成,DescribeTunnel接口可以實時獲取當前Tunnel下每一個Channel的Client(可以自定義), 消費總行數和消費位點等信息。比如用戶有下午2點-3點的數據需要同步,若DescribeTunnel獲取到的消費位點為下午2點半,則表示備份到一半了,若獲取到的消費位點為下午3點,則代表2點-3點的數據已經備份完畢了。
執行文件恢復
在數據備份完成后,我們可以根據需求進行全量的恢復和部分數據的恢復,來降低RTO。在恢復數據時,可以有一些措施來優化恢復的性能:
- 從OSS下載時,可以使用流式下載或者分片下載的方式,必要時可以增加并發。
- 寫入TableStore時,可以使用并發BatchWrite的方式。
下圖為Restore的實例代碼(略去若干細節),首先根據需求算出需要下載的文件名(和備份策略對應),然后用OSS的流式下載讀取相應的文件,最后用并發的BatchWrite方式寫入TableStore的恢復表中。
public class TunnelRestore {private ConfigHelper config;private final SyncClient syncClient;private final CsvHelper csvHelper;private final OSSClient ossClient;public TunnelRestore(ConfigHelper config) {this.config = config;syncClient = new SyncClient(config.getEndpoint(), config.getAccessId(), config.getAccessKey(),config.getInstanceName());ossClient = new OSSClient(config.getOssEndpoint(), config.getAccessId(), config.getAccessKey());csvHelper = new CsvHelper(syncClient, ossClient);}public void restore(String filename, String tableName) {csvHelper.parseStreamRecordsFromCSV(filename, tableName);}public static void main(String[] args) {TunnelRestore restore = new TunnelRestore(new ConfigHelper());restore.restore("FullData-1551767131130.csv", "testRestore");}
}
總結
本文首先介紹了Table Store備份的需求,接著介紹了使用表格存儲的通道服務進行數據備份和恢復的原理,最后結合實際的代碼片段講述了如何一步步的構建Tabel Store數據備份恢復方案。
原文鏈接
本文為云棲社區原創內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的Table Store: 海量结构化数据实时备份实战的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。