亿级消息系统的核心存储:Tablestore发布Timeline 2.0模型
背景
互聯網快速發展的今天,社交類應用、消息類功能大行其道,占據了大量網絡流量。大至釘釘、微信、微博、知乎,小至各類App的推送通知,消息類功能幾乎成為所有應用的標配。根據場景特點,我們可以將消息類場景歸納成三大類:IM(釘釘、微信)、Feed流(微博、知乎)以及常規消息隊列。因此,如何開發一個簡便而又高效IM或Feed流功能,成為了很多架構師、開發人員不得不面對的問題。
Timeline 1.0版模型
針對消息類場景,表格存儲團隊針對JAVA語言打造了一個TableStore-Timeline 1.0版數據模型模型(簡稱Timeline模型)。基于場景經驗與理解,將消息場景封裝成一個數據模型,提供了表結構設計,讀寫方式等解決方案供需求者使用。用戶只需依托模型API,直接忽略Timeline到底層存儲系統之間的架構方案,直接基于接口實現業務邏輯。它能滿足消息數據場景對消息保序、海量消息存儲、實時同步等特殊需求。Timeline 1.0是定義在表格存儲之上抽象出來的數據模型,具體內容參見《TableStore Timeline:輕松構建千萬級IM和Feed流系統》。
全文檢索、模糊查詢需求
在表格存儲的Timeline模型受到廣泛使用的過程中,我們也逐漸發現消息類數據的全文檢索、模糊查詢這一很強需求。而原有模型的在線查詢能力存在一定短板。隨著表格存儲支持了SearchIndex能力,使得Timeline模型支持在線全文檢索、模糊查詢成為了可能。所以我們基于原有的架構設計,重新打造了Timeline 2.0模型,引入了強大的查詢能力與數據管理新方案。
項目代碼目前已經開源在了GitHub上:Timeline@GitHub。
2.0時代到來
此次推出的Timeline模型2.0版,沒有直接基于1.X版本直接改造。而是在兼容原有模型架構之上,定義、封裝了新的使用接口。重新打造升級新的模型,增加了如下功能:
- 增加了Timeline Meta的管理能力;
- 基于多元索引功能,增加了Meta、Timeline的全文檢索、多維組合查詢能力;
- 支持SequenceId兩種設置方式:自增列、手動設置;
- 支持多列的Timeline Identifier設置,提供Timeline的分組管理能力;
- 兼容Timeline 1.X模型,提供的TimelineMessageForV1樣例可直接讀、寫1.X版本消息,用戶也可仿照實現。
架構解析
Timeline做為表格存儲直接支持的一種數據模型,以『簡單』為設計目標,其存核心模塊構成比較清晰明了。Timeline盡量提升用戶的使用自由度,讓用戶能夠根據自身場景需求選擇更為合適的實現方案。模型的模塊架構如上圖,主要包括如下重要部分:
- Store:存儲庫,類似數據庫的表的概念,模型包含兩類Store分別為Meta Store、Timeline Store。
- Identifier:用于區分Timeline的唯一標識,用以標識單行Meta,以及相應的Timeline Queue。
- Meta:用于描述Timeline的元數據,元數據描述采用free-schema結構,可自由包含任意列。
- Queue:Queue為單Identifier對應的所有消息隊列,一個Timeline下Message按照Queue單元存儲。
- SequenceId:Queue中消息體的序列號,需保證遞增、唯一,模型支持自增列、自定義兩種實現模式。
- Message:Timeline內傳遞的消息體,是一個free-schema的結構,可自由包含任意列。
- Index:基于SearchIndex實現的索引,針對不同Store分為Meta Index和Message Index兩類,可對Meta或Message內的任意列自定義索引,提供靈活的多條件組合查詢和搜索。
性能優勢
Timeline 模型是基于Tablestore抽象、封裝出的一類場景數據模型,因而具有Tablestore自身的所有優點。同時結合場景設計的接口,讓用戶更直觀、清晰的實現業務邏輯,總結如下:
- 支撐海量數據存儲:分布式架構,高可擴展,支持10PB級的消息。
- 低存儲成本:表格存儲提供低成本的存儲方式,按量付費、資源包、預留Cu。
- 數據生命周期管理:不同類型(表級別)消息數據,可自定義不同生命周期。
- 極高的寫入吞吐:具備極高的寫入吞吐能力,可應對00M TPS的消息寫入。
- 低延遲的讀:查詢消息延遲低,毫秒量級。
- 接口設計:可讀性高,接口功能全面、清晰。
Maven地址
Timeline Lib
<dependency><groupId>com.aliyun.openservices.tablestore</groupId><artifactId>Timeline</artifactId><version>2.0.0</version> </dependency>TableStore Java SDK
Timeline模型在TableStore Java SDK >= 4.12.1作為基本數據模型直接提供,表格存儲老用戶可升級SDK直接使用
<dependency><groupId>com.aliyun.openservices</groupId><artifactId>tablestore</artifactId><version>4.12.1</version> </dependency>入手指南
初始化
初始化Factory
用戶將SyncClient作為參數,初始化StoreFactory,通過工廠創建Meta數據、Timeline數據的管理Store。錯誤重試的實現依賴SyncClient的重試策略,用戶通過設置SyncClient實現重試。如有特殊需求,可自定義策略(只需實現RetryStrategy接口)。
/*** 重試策略設置* Code: configuration.setRetryStrategy(new DefaultRetryStrategy());* */ ClientConfiguration configuration = new ClientConfiguration();SyncClient client = new SyncClient("http://instanceName.cn-shanghai.ots.aliyuncs.com","accessKeyId","accessKeySecret","instanceName", configuration);TimelineStoreFactory factory = new TimelineStoreFactoryImpl(client);初始化MetaStore
構建meta表的Schema(包含Identifier、MetaIndex等參數),通過Store工廠創建并獲取Meta的管理Store;配置參數包含:Meta表名、索引,表名、主鍵字段、索引名、索引類型等參數。
TimelineIdentifierSchema idSchema = new TimelineIdentifierSchema.Builder().addStringField("timeline_id").build();IndexSchema metaIndex = new IndexSchema(); metaIndex.addFieldSchema( //配置索引字段、類型new FieldSchema("group_name", FieldType.TEXT).setIndex(true).setAnalyzer(FieldSchema.Analyzer.MaxWord)new FieldSchema("create_time", FieldType.Long).setIndex(true) );TimelineMetaSchema metaSchema = new TimelineMetaSchema("groupMeta", idSchema).withIndex("metaIndex", metaIndex); //設置索引TimelineMetaStore timelineMetaStore = serviceFactory.createMetaStore(metaSchema);初始化TimelineStore
構建timeline表的Schema配置,包含Identifier、TimelineIndex等參數,通過Store工廠創建并獲取Timeline的管理Store;配置參數包含:Timeline表名、索引,表名、主鍵字段、索引名、索引類型等參數。
消息的批量寫入,基于Tablestore的DefaultTableStoreWriter提升并發,用戶可以根據自己需求設置線程池數目。
Meta管理
Meta管理提供了增、刪、改、單行讀、多條件組合查詢等接口。其中多條件組合查詢功能基于多元索引,只有設置了IndexSchema的MetaStore才支持組合查詢功能。索引類型支持LONG、DOUBLE、BOOLEAN、KEYWORD、GEO_POINT等類型,屬性包含Index、Store和Array,其含義與多元索引相同。
TimelineIdentifer是區分Timeline的唯一標識,重復的Identifier會被覆蓋。
/*** 接口使用參數* */ TimelineIdentifier identifier = new TimelineIdentifier.Builder().addField("timeline_id", "group").build(); TimelineMeta meta = new TimelineMeta(identifier).setField("filedName", "fieldValue");/*** 創建Meta表(如果設置索引則會創建索引)* */ timelineMetaStore.prepareTables();/*** 插入Meta數據* */ timelineMetaStore.insert(meta);/*** 根據id讀取單行Meta數據* */ timelineMetaStore.read(identifier);/*** 更新Meta數據* */ meta.setField("fieldName", "newValue"); timelineMetaStore.update(meta);/*** 根據id刪除單行Meta數據* */ timelineMetaStore.delete(identifier);/*** 通過SearchParameter參數檢索* */ SearchParameter parameter = new SearchParameter(field("fieldName").equals("fieldValue") ); timelineMetaStore.search(parameter);/*** 通過SearchQuery參數檢索(SearchQuery是SDK原生類型,支持所有多元索引檢索條件)* */ TermQuery query = new TermQuery(); query.setFieldName("fieldName"); query.setTerm(ColumnValue.fromString("fieldValue"));SearchQuery searchQuery = new SearchQuery().setQuery(query); timelineMetaStore.search(searchQuery);/*** 刪除Meta表(如果存在索引,同時刪除索引)* */ timelineMetaStore.dropAllTables();Timeline管理
Timeline管理提供了消息模糊查詢、多條件組合查詢接口。消息的全文檢索依托多元索引,用戶只需將相應字段索引類型設置為TEXT,即可通過Search接口實現消息的全文檢索。Timeline管理包含消息表的創建、檢索、刪除等。
/*** 接口使用參數* */ SearchParameter searchParameter = new SearchParameter(field("text").equals("fieldValue") );TermQuery query = new TermQuery(); query.setFieldName("text"); query.setTerm(ColumnValue.fromString("fieldValue")); SearchQuery searchQuery = new SearchQuery().setQuery(query).setLimit(10);/*** 創建Meta表(如果設置索引則會創建索引)* */ timelineStore.prepareTables();/*** 通過SearchParameter參數檢索* */ timelineStore.search(searchParameter);/*** 通過SearchQuery參數檢索(SearchQuery是SDK原生類型,支持所有多元索引檢索條件)* */timelineStore.search(searchQuery);/*** 將Writer隊列中未發的請求主動觸發,同步等待直到所有消息完成存儲* */ timelineStore.flush();/*** 關閉Writer與Writer中的線程池* */ timelineStore.close();/*** 刪除Timeline表(如果存在索引,同時刪除索引)* */ timelineStore.dropAllTables();Queue管理
Queue是單個消息隊列的抽象概念,對應一個Store下單個Identifier的所有消息。通過Queue實例管理相應Identifer的消息隊列,支持基本的增、刪、改、單行查、范圍查等接口。
/*** 接口使用參數* */ TimelineIdentifier identifier = new TimelineIdentifier.Builder().addField("timeline_id", "group").build(); long sequenceId = 1557133858994L; TimelineMessage message = new TimelineMessage().setField("text", "Timeline is fine."); ScanParameter scanParameter = new ScanParameter().scanBackward(Long.MAX_VALUE, 0); TimelineCallback callback = new TimelineCallback() {@Overridepublic void onCompleted(TimelineIdentifier i, TimelineMessage m, TimelineEntry t) {// do something when succeed.}@Overridepublic void onFailed(TimelineIdentifier i, TimelineMessage m, Exception e) {// do something when failed.} };/*** 單個Identifier對應的消息隊列* */ timelineQueue = timelineStore.createTimelineQueue(identifier);/*** 存儲消息* */ //同步 timelineQueue.store(message); timelineQueue.store(sequenceId, message); //異步,支持callback timelineQueue.storeAsync(message, callback); timelineQueue.storeAsync(sequenceId, message, callback); //異步批量 timelineQueue.batchStore(message); timelineQueue.batchStore(sequenceId, message); //異步批量,支持callback timelineQueue.batchStore(message, callback); timelineQueue.batchStore(sequenceId, message, callback);/*** 單行讀取、獲取最新一行、獲取最新SequenceId* */ timelineQueue.get(sequenceId); timelineQueue.getLatestTimelineEntry(); timelineQueue.getLatestSequenceId();/*** 根據SequenceId更新消息* */ message.setField("text", "newValue"); timelineQueue.update(sequenceId, message); timelineQueue.updateAsync(sequenceId, message, callback);/*** 根據SequenceId刪除消息* */ timelineQueue.delete(sequenceId);/*** 根據范圍參數、Filter獲取批量消息* */ timelineQueue.scan(scanParameter);專家服務
表格存儲有一批精通Timeline領域的技術專家,在打造IM、Feed流場景方面有著獨到的見解。如果您:
- 渴望尋覓Timeline領域高手過招;
- 調研Timeline場景解決方案;
- 準備入門Timeline場景;
- 對表格存儲(Tablestore)產品感興趣;
原文鏈接
本文為云棲社區原創內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的亿级消息系统的核心存储:Tablestore发布Timeline 2.0模型的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 阿里巴巴在内蒙古旱区试水物联网灌溉技术,
- 下一篇: 云栖专辑 | 阿里开发者们的第6个感悟: