基于ZooKeeper的分布式Session实现
基于ZooKeeper的分布式Session實現
[轉 http://blog.csdn.net/jacktan/article/details/6112806]認識ZooKeeper
ZooKeeper——“動物園管理員”。動物園里當然有好多的動物,游客可以根據動物園提供的向導圖到不同的場館觀賞各種類型的動物,而不是像走在原始叢林里,心驚膽顫的被動物所觀賞。為了讓各種不同的動物呆在它們應該呆的地方,而不是相互串門,或是相互廝殺,就需要動物園管理員按照動物的各種習性加以分類和管理,這樣我們才能更加放心安全的觀賞動物。
回到我們企業級應用系統中,隨著信息化水平的不斷提高,我們的企業級系統變得越來越龐大臃腫,性能急劇下降,客戶抱怨頻頻。拆分系統是目前我們可選擇的解決系統可伸縮性和性能問題的唯一行之有效的方法。
但是拆分系統同時也帶來了系統的復雜性——各子系統不是孤立存在的,它們彼此之間需要協作和交互,這就是我們常說的分布式系統。各個子系統就好比動物園里的動物,為了使各個子系統能正常為用戶提供統一的服務,必須需要一種機制來進行協調——這就是ZooKeeper。
關于ZooKeeper更正式的介紹如下: ZooKeeper是一個為分布式應用程序提供高性能協調服務的工具集合。它可以應用在一些需要提供統一協調服務的case中,例如命名、配置管理、同步和組服務等。而在我們的case中,它被作為一個協調分布式環境中各子系統之間共享狀態數據的基礎設施。
ZooKeeper之特性
ZooKeeper本質上是一個分布式的小文件存儲系統。原本是Apache Hadoop的一個組件,現在被拆分為一個Hadoop的獨立子項目,在HBase(Hadoop的另外一個被拆分出來的子項目,用于分布式環境下的超大數據量的DBMS)中也用到了ZooKeeper集群。ZooKeeper有如下的特性:
- 簡單
ZooKeeper核心是一個精簡的文件系統,它提供了一些簡單的文件操作以及附加的功能,例如排序和通知。
- 易表達
ZooKeeper的數據結構原型是一棵znode樹(類似Linux的文件系統),并且它們是一些已經被構建好的塊,可以用來構建大型的協作數據結構和協議。
- 高可用性
ZooKeeper可以運行在一組服務器上,同時它們被設計成高可用性,為你的應用程序避免單點故障。
- 松耦合交互
ZooKeeper提供的Watcher機制使得各客戶端與服務器的交互變得松耦合,每個客戶端無需知曉其他客戶端的存在,就可以和其他客戶端進行數據交互。
- 豐富的API
ZooKeeper為開發人員提供了一套豐富的API,減輕了開發人員編寫通用協議的負擔。
這篇文章是關于如何在ZooKeeper上創建分布式Session系統,所以關于ZooKeeper的安裝、使用、管理等主題不在本文的討論范圍內,如果想了解ZooKeeper更加詳細的情況,請看另外一篇文章《ZooKeeper實戰》。
為什么使用ZooKeeper
目前有關于分布式Session的實現基本上都是基于memcached。memcached本質上是一個內存緩存系統。雖然memcached也可以是分布式集群環境的,但是對于一份數據來說,它總是存儲在某一臺memcached服務器上。如果發生網絡故障或是服務器當機,則存儲在這臺服務器上的所有數據都將不可訪問。由于數據是存儲在內存中的,重啟服務器,將導致數據全部丟失。當然你可以自己實現一套機制,用來在分布式memcached之間進行數據的同步和持久化,但是實現這套機制談何容易!
由上述ZooKeeper的特性可知,ZooKeeper是一個分布式小文件系統,并且被設計為高可用性。通過選舉算法和集群復制可以避免單點故障,由于是文件系統,所以即使所有的ZooKeeper節點全部掛掉,數據也不會丟失,重啟服務器之后,數據即可恢復。另外ZooKeeper的節點更新是原子的,也就是說更新不是成功就是失敗。通過版本號,ZooKeeper實現了更新的樂觀鎖,當版本號不相符時,則表示待更新的節點已經被其他客戶端提前更新了,而當前的整個更新操作將全部失敗。當然所有的一切ZooKeeper已經為開發者提供了保障,我們需要做的只是調用API。
有人會懷疑ZooKeeper的執行能力,在ZooKeeper誕生的地方——Yahoo!給出了一組數據將打消你的懷疑。它的吞吐量標準已經達到大約每秒10000基于寫操作的工作量。對于讀操作的工作量來說,它的吞吐量標準還要高幾倍。
實現分布式Session所面臨的挑戰
實現分布式session最大的挑戰莫過于如何實現session在分布式系統之間的共享。在分布式環境下,每個子系統都是跨網絡的獨立JVM,在這些JVM之間實現共享數據的方式無非就是TCP/IP通訊。無論是memcached,還是ZooKeeper,底層都是基于TCP/IP的。所以,我認為使用何種工具實現分布式Session都是可行的,沒有那種實現優于另外一種實現,在不同的應用場景,各有優缺點。世間萬物,無十全十美,不要盲目的崇拜某種技術,唯有適合才是真理。
1) Session ID的共享
Session ID是一個實例化Session對象的唯一標識,也是它在Web容器中可以被識別的唯一身份標簽。Jetty和Tomcat容器會通過一個Hash算法,得到一個唯一的ID字符串,然后賦值給某個實例化的Session,此時,這個Session就可以被放入Web容器的SessionManager中開始它短暫的一生。在Servlet中,我們可以通過HttpSession的getId()方法得到這個值,但是我們無法改變這個值。當Session走到它一生盡頭的時候,Web容器的SessionManager會根據這個ID將其“火化”。所以Session ID是非常重要的一個屬性,并且要保證它的唯一性。在單系統中,Session ID只需要被自身的Web容器讀寫,但是在分布式環境中,多個Web容器需要共享同一個Session ID。因此,當某個子系統的Web容器產生一個新的ID時,它必須需要一種機制來通知其他子系統,并且告知新ID是什么。
2) Session中數據的復制
和共享Session ID的問題一樣,在分布式環境下,Session中的用戶數據也需要在各個子系統中共享。當用戶通過HttpSession的setAttribute()方法在Session中設置了一個用戶數據時,它只存在于當前與用戶交互的那個Web容器中,而對其他子系統的Web容器來說,這些數據是不可見的。當用戶在下一步跳轉到另外一個Web容器時,則又會創建一個新的Session對象,而此Session中并不包含上一步驟用戶設置的數據。其實Session在分布式系統之間的復制實現是簡單的,但是每次在Session數據發生變化時,都在子系統之間復制一次數據,會大大降低用戶的響應速度。因此我們需要一種機制,即可以保證Session數據的一致性,又不會降低用戶操作的響應度。
3) Session的失效
Session是有生命周期的,當Session的空閑時間(maxIdle屬性值)超出限制時,Session就失效了,這種設計主要是考慮到了Web容器的可靠性。當一個系統有上萬人使用時,就會產生上萬個Session對象,由于HTTP的無狀態特性,服務器無法確切的知道用戶是否真的離開了系統。因此如果沒有失效機制,所有被Session占據的內存資源將永遠無法被釋放,直到系統崩潰為止。在分布式環境下,Session被簡單的創建,并且通過某種機制被復制到了其他系統中。你無法保證每個子系統的時鐘都是一致的,可能相差幾秒,甚至相差幾分鐘。當某個Web容器的Session失效時,可能其他的子系統中的Session并未失效,這時會產生一個有趣的現象,一個用戶在各個子系統之間跳轉時,有時會提示Session超時,而有時又能正常操作。因此我們需要一種機制,當某個系統的Session失效時,其他所有系統的與之相關聯的Session也要同步失效。
4) 類裝載問題
在單系統環境下,所有類被裝載到“同一個”ClassLoader中。我在同一個上打了引號,因為實際上并非是同一個ClassLoader,只是邏輯上我們認為是同一個。這里涉及到了JVM的類裝載機制,由于這個主題不是本文的討論重點,所以相關詳情可以參考相關的JVM文檔。因此即使是由memcached或是ZooKeeper返回的字節數組也可以正常的反序列化成相對應的對象類型。但是在分布式環境下,問題就變得異常的復雜。我們通過一個例子來描述這個問題。用戶在某個子系統的Session中設置了一個User類型的對象,通過序列化,將User類型的對象轉換成字節數組,并通過網絡傳輸到了memcached或是ZooKeeper上。此時,用戶跳轉到了另外一個子系統上,需要通過getAttribute方法從memcached或是ZooKeeper上得到先前設置的那個User類型的對象數據。但是問題出現了,在這個子系統的ClassLoader中并沒有裝載User類型。因此在做反序列化時出現了ClassNotFoundException異常。
當然上面描述的4點挑戰只是在實現分布式Session過程中面臨的關鍵問題,并不是全部。其實在我實現分布式Session的整個過程中還遇到了其他的一些挑戰。比如,需要通過filter機制攔截HttpServletRequest,以便覆蓋其getSession方法。但是在不同的Web容器中(例如Jetty或是Tomcat)對HttpServletRequest的實現是不一樣的,雖然都是實現了HttpServletRequest接口,但是各自又添加了一些特性在其中。例如,在Jetty容器中,HttpSession的實現類是一個保護內部類,無法從其繼承并覆蓋相關的方法,只能從其實現類的父類中繼承更加抽象的Session實現。這樣就會造成一個問題,我必須重新實現對Session整個生命周期管理的SessionManager接口。有人會說,那就放棄它的實現吧,我們自己實現HttpSession接口。很不幸,那是不可能的。因為在Jetty的HttpServletRequest實現類的一些方法中對Session的類型進行了強制轉換(轉換成它自定義的HttpSession實現類),如果不從其繼承,則會出現ClassCastException異常。相比之下,Tomcat的對HttpServletRequest和HttpSession接口的實現還是比較標準的。由此可見,實現分布式Session其實是和某種Web容器緊密耦合的。并不像網上有些人的輕描淡寫,僅僅覆蓋setAttribute和getAttribute方法是行不通的。
5. 算法實現
從上述的挑戰來看,要寫一個分布式應用程序是困難的,主要原因是因為局部故障。由于數據需要通過網絡傳輸,而網絡是不穩定的,所以如果網絡發生故障,則所有的數據通訊都將終止。ZooKeeper并不能解決網絡故障的發生,甚至它本身也是基于網絡的分布式應用程序。但是它為我們提供了一套工具集合,幫助我們建立安全處理局部故障的分布式應用程序。接下來我們就開始描述如何實現基于ZooKeeper的分布式Session系統。
1) 基于ZooKeeper的分布式Session系統架構
為了實現高可用性,采用了ZooKeeper集群,ZooKeeper集群是由一臺領導者服務器和若干臺跟隨者服務器構成(總服務器數要奇數)。所有的讀操作由跟隨者提供,而寫操作由領導者提供,并且領導者還負責將寫入的數據復制到集群中其他的跟隨者。當領導者服務器由于故障無法訪問時,剩下的所有跟隨者服務器就開始進行領導者的選舉。通過選舉算法,最終由一臺原本是跟隨者的服務器升級為領導者。當然原來的領導者服務器一旦被恢復,它就只能作為跟隨者服務器,并在下一次選舉中爭奪領導者的位置。
Web容器中的Session容器也將發生變化。它不再對用戶的Session進行本地管理,而是委托給ZooKeeper和我們自己實現的Session管理器。也就是說,ZooKeeper負責Session數據的存儲,而我們自己實現的Session管理器將負責Session生命周期的管理。
最后是關于在分布式環境下共享Session ID的策略。我們還是通過客戶端的Cookie來實現,我們會自定義一個Cookie,并通過一定的算法在多個子系統之間進行共享。下面會對此進行詳細的描述。
2) 分布式Session的數據模型
Session數據的存儲是有一定格式的,下圖展示了一個Session ID為1gyh0za3qmld7的Session在ZooKeeper上的存儲結構:
/SESSIONS是一個組節點,用來在ZooKeeper上劃分不同功能組的定義。你可以把它理解為一個文件夾目錄。在這個目錄下可以存放0個或N個子節點,我們就把一個Session的實例作為一個節點,節點的名稱就是Session ID。在ZooKeeper中,每個節點本身也可以存放一個字節數組。因此,每個節點天然就是一個Key-Value鍵值對的數據結構。
我們將Session中的用戶數據(本質上就是一個Map)設計成多節點,節點名稱就是Session的key,而節點的數據就是Session的Value。采用這種設計主要是考慮到性能問題和ZooKeeper對節點大小的限制問題。當然,我們可以將Session中的用戶數據保存在一個Map中,然后將Map序列化之后存儲在對應的Session節點中。但是大部分情況下,我們在讀取數據時并不需要整個Map,而是Map中的一個或幾個值。這樣就可以避免一個非常大的Map在網絡間傳來傳去。同理,在寫Session的時候,也可以最大限度的減少數據流量。另外由于ZooKeeper是一個小文件系統,為了性能,每個節點的大小為1MB。如果Session中的Map大于1MB,就不能單節點的存儲了。當然,一個Key的數據量是很少會超過1MB的,如果真的超過1MB,你就應該考慮一下,是否應該將此數據保存在Session中。
最后我們來關注一下Session節點中的數據——SessionMetaData。它是一個Session實例的元數據,保存了一些與Session生命周期控制有關的數據。
public class SessionMetaData implements Serializable {private static final long serialVersionUID = -6446174402446690125L;private String id;/**session的創建時間*/private Long createTm;/**session的最大空閑時間*/private Long maxIdle;/**session的最后一次訪問時間*/private Long lastAccessTm;/**是否可用*/private Boolean validate =false;/**當前版本*/privateint version = 0;/***構造方法*/public SessionMetaData() {this.createTm = System.currentTimeMillis();this.lastAccessTm = this.createTm;this.validate = true;}// ……以下是N多getter和setter方法 }其中需要關注的屬性有:
a) id屬性:Session實例的ID。
b) maxIdle屬性:Session的最大空閑時間,默認情況下是30分鐘。
c) lastAccessTm屬性:Session的最后一次訪問時間,每次調用Request.getSession方法時都會去更新這個值。用來計算當前Session是否超時。如果lastAccessTm+maxIdle小于System.currentTimeMillis(),就表示當前Session超時。
d) validate屬性:表示當前Session是否可用,如果超時,則此屬性為false。
e) version屬性:這個屬性是為了冗余Znode的version值,用來實現樂觀鎖,對Session節點的元數據進行更新操作。
這里有必要提一下一個老生常談的問題,就是所有存儲在節點上的對象必須是可序列化的,也就是必須實現Serializable接口,否則無法保存。這個問題在memcached和ZooKeeper上都存在的。
3) 實現過程
實現分布式Session的第一步就是要定義一個filter,用來攔截HttpServletRequest對象。以下代碼片段,展現了在Jetty容器下的filter實現。
這個filter是繼承自DistributedSessionFilter的,這個父類主要是負責完成初始化參數設置等通用方法的實現,代碼如下所示:
public abstract class DistributedSessionFilter implements Filter {protected Logger log = Logger.getLogger(getClass());/**參數配置*/protected Configuration conf;/**Session管理器*/protected SessionManager sessionManager;/**初始化參數名稱*/public static final String SERVERS ="servers";public static final String TIMEOUT ="timeout";public static final String POOLSIZE ="poolsize";/***初始化*@see javax.servlet.Filter#init(javax.servlet.FilterConfig)*/@Overridepublic void init(FilterConfig filterConfig)throws ServletException {conf =new Configuration();String servers = filterConfig.getInitParameter(SERVERS);if (StringUtils.isNotBlank(servers)) {conf.setServers(servers);}String timeout = filterConfig.getInitParameter(TIMEOUT);if (StringUtils.isNotBlank(timeout)) {try {conf.setTimeout(Long.valueOf(timeout));}catch (NumberFormatException ex) {log.error("timeout parse error[" + timeout +"].");}}String poolsize = filterConfig.getInitParameter(POOLSIZE);if (StringUtils.isNotBlank(poolsize)) {try {conf.setPoolSize(Integer.valueOf(poolsize));}catch (NumberFormatException ex) {log.error("poolsize parse error[" + poolsize +"].");}}//初始化ZooKeeper配置參數ZooKeeperHelper.initialize(conf);}/***銷毀*@see javax.servlet.Filter#destroy()*/@Overridepublic void destroy() {if (sessionManager !=null) {try {sessionManager.stop();}catch (Exception e) {log.error(e);}}//銷毀ZooKeeperZooKeeperHelper.destroy();log.debug("DistributedSessionFilter.destroy completed.");}@Overridepublic void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {//Jetty容器的Request對象包裝器,用于重寫Session的相關操作JettyRequestWrapper req =new JettyRequestWrapper(request,sessionManager);chain.doFilter(req, response);} }在filter中需要關注的重點是doFilter方法。
@Overridepublic void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {//Jetty容器的Request對象包裝器,用于重寫Session的相關操作JettyRequestWrapper req =new JettyRequestWrapper(request,sessionManager);chain.doFilter(req, response);} }這里實例化了一個包裝器(裝飾者模式)類,用來包裝Jetty容器的Request對象,并覆蓋其getSession方法。 另外我們還自己實現sessionManager接口,用來管理Session的生命周期。通過filter機制,我們就接管了Session的整個生命周期的管理權。
接下來我們來看看,Request包裝器是如何重寫getSession方法,替換成使用ZooKeeper上的Session數據。關鍵代碼如下所示:
實現很簡單,大部分工作都委托給了sessionManager來處理。因此,還是讓我們來關注sessionManager的相關方法實現。
A) 獲取Session ID:
@Overridepublic String getRequestSessionId(HttpServletRequest request) {return CookieHelper.findSessionId(request);}這個方法就是從客戶端的Cookies中查找我們的一個自定義的Cookie值,這個Cookie的名稱為:”DISTRIBUTED_SESSION_ID”(Web容器自己也在Cookie中寫了一個值,用來在不同的request中傳遞Session ID,這個Cookie的名稱叫“JSESSIONID”)。如果返回null,則表示客戶端從來都沒有創建過Session實例。
B) 如果返回的Cookie值不為null,則有3種可能性:其一,已經實例化過一個Session對象并且可以正常使用;其二,雖然已經實例化過了,但是可能此Session已經超時失效;其三,分布式環境中的其他子系統已經實例化過了,但是本系統中還未實例化過此Session對象。所以先要對已經存在的Session ID進行處理。關鍵代碼如下:
@Overridepublic HttpSession getHttpSession(String id, HttpServletRequest request) {//類型檢查if (!(requestinstanceof Request)) {log.warn("不是Jetty容器下的Request對象");return null;}//將HttpServletRequest轉換成Jetty容器的Request類型Request req = (Request) request;//ZooKeeper服務器上查找指定節點是否有效boolean valid = ZooKeeperHelper.isValid(id);//如果為false,表示服務器上無該Session節點,需要重新創建(返回null)if (!valid) {//刪除本地的副本sessions.remove(id);return null;}else {//更新Session節點的元數據ZooKeeperHelper.updateSessionMetaData(id);HttpSession session =sessions.get(id);//如果存在,則直接返回if (session !=null) {return session;}//否則創建指定ID的Session并返回(用于同步分布式環境中的其他機器上的Session本地副本)session =new JettyDistributedSession((AbstractSessionManager) req.getSessionManager(),System.currentTimeMillis(), id);sessions.put(id, session);return session;}}首先根據ID去ZooKeeper上驗證此Session是否有效,如果無效了,則直接返回null,表示此Session已經超時不可用,同時需要刪除本地的“影子”Session對象(不管存在與否)。如果該節點有效,則首先更新該Session節點的元數據(例如,最后一次訪問時間)。然后先到本地的Session容器中查找是否存在該ID的Session對象。本地Session容器中的Session對象并不用來保存用戶數據,也不進行生命周期管理,純粹為了在不同請求中進行傳遞。唯一有價值的就Session ID,因此,我喜歡把本地Session容器中的Session對象稱為“影子”Session,它只是ZooKeeper上真正Session的一個影子而已。
如果Session節點沒有失效,但是本地Session容器并沒有指定ID的”影子”Session,則表示是第三種可能性,需要進行影子Session的同步。正如代碼中所展示的,我們實例化一個指定ID的Session對象,并放入當前系統的Session容器中,這樣就完成了Session ID在分布式環境中的共享,以及Session對象在各子系統之間的同步。
C) 如果通過上面的方法返回的Session對象還是null,則真的需要實例化一個Session對象了,代碼如下所示:
public HttpSession newHttpSession(HttpServletRequest request) {//類型檢查if (!(request instanceof Request)) {log.warn("不是Jetty容器下的Request對象");return null;}//將HttpServletRequest轉換成Jetty容器的Request類型Request req = (Request) request;Session session = new JettyDistributedSession((AbstractSessionManager) req.getSessionManager(), request);addHttpSession(session, request);String id = session.getId();//寫cookieCookie cookie = CookieHelper.writeSessionIdToCookie(id, req, req.getConnection().getResponse());if (cookie !=null) {log.debug("Wrote sid to Cookie,name:[" + cookie.getName() +"],value:["+ cookie.getValue() +"]");}//在ZooKeeper服務器上創建session節點,節點名稱為Session ID//創建元數據SessionMetaData metadata =new SessionMetaData();metadata.setId(id);metadata.setMaxIdle(config.getTimeout() * 60 * 1000);//轉換成毫秒ZooKeeperHelper.createSessionNode(metadata);return session;}以上代碼會實例化一個Session對象,并將Session ID寫入客戶端Cookie中,最后實例化Session元數據,并在ZooKeeper上新建一個Session節點。
通過上面步驟,我們就將Session的整個生命周期管理與ZooKeeper關聯起來了。接下來我們看看Session對象的幾個重要方法的重寫:
public synchronized Object getAttribute(String name) {//獲取session IDString id = getId();if (StringUtils.isNotBlank(id)) {//返回Session節點下的數據return ZooKeeperHelper.getSessionData(id, name);}return null;}public synchronized void removeAttribute(String name) {//獲取session IDString id = getId();if (StringUtils.isNotBlank(id)) {//刪除Session節點下的數據ZooKeeperHelper.removeSessionData(id, name);}}public synchronized void setAttribute(String name, Object value) {//獲取session IDString id = getId();if (StringUtils.isNotBlank(id)) {//將數據添加到ZooKeeper服務器上ZooKeeperHelper.setSessionData(id, name, value);}}public void invalidate() throws IllegalStateException {//獲取session IDString id = getId();if (StringUtils.isNotBlank(id)) {//刪除Session節點ZooKeeperHelper.deleteSessionNode(id);}}這些方法中都是直接和ZooKeeper上對應的Session進行數據交換。
本來我是想在本地Session對象上創建一個ZooKeeper的緩沖,當用戶調用Session的讀方法時,先到本地緩沖中讀數據,讀不到再到ZooKeeper上去取,這樣可以減少網絡的通訊開銷。但在分布式環境下,這種策略所帶來的數據同步開銷更加的可觀。因為每次一個子系統的Session數據更新,都將觸發所有其他子系統與之關聯的Session數據同步操作,否則Session中數據的一致性將無法得到保障。
看到這里,大家可能已經發覺了,所有與ZooKeeper交互的代碼都被封裝到ZooKeeperHelper類中,接下來就來看看這個類的實現。
4) ZooKeeperHelper類實現
public class ZooKeeperHelper {/**日志 */private static Logger log =Logger.getLogger(ZooKeeperHelper.class);private static String hosts;private static ExecutorServicepool = Executors.newCachedThreadPool();private static final String GROUP_NAME ="/SESSIONS";/***初始化*/public static void initialize(Configuration config) {hosts = config.getServers();}/***銷毀*/public static void destroy() {if (pool !=null) {//關閉pool.shutdown();}}/***連接服務器**@return*/public static ZooKeeper connect() {ConnectionWatcher cw =new ConnectionWatcher();ZooKeeper zk = cw.connection(hosts);return zk;}/***關閉一個會話*/public static void close(ZooKeeper zk) {if (zk !=null) {try {zk.close();}catch (InterruptedException e) {log.error(e);}}}/***驗證指定ID的節點是否有效*@param id*@return*/public static boolean isValid(String id) {ZooKeeper zk =connect();if (zk !=null) {try {return isValid(id, zk);}finally {close(zk);}}return false;}/***驗證指定ID的節點是否有效*@param id*@param zk*@return*/public static boolean isValid(String id, ZooKeeper zk) {if (zk != null) {//獲取元數據SessionMetaData metadata =getSessionMetaData(id, zk);//如果不存在或是無效,則直接返回nullif (metadata == null) {return false;}return metadata.getValidate();}return false;}/***返回指定ID的Session元數據*@param id*@return*/public static SessionMetaData getSessionMetaData(String id, ZooKeeper zk) {if (zk !=null) {String path = GROUP_NAME +"/" + id;try {//檢查節點是否存在Stat stat = zk.exists(path,false);//stat為null表示無此節點if (stat == null) {return null;}//獲取節點上的數據byte[] data = zk.getData(path,false,null);if (data != null) {//反序列化Object obj = SerializationUtils.deserialize(data);//轉換類型if (obj instanceof SessionMetaData) {SessionMetaData metadata = (SessionMetaData) obj;//設置當前版本號metadata.setVersion(stat.getVersion());return metadata;}}}catch (KeeperException e) {log.error(e);}catch (InterruptedException e) {log.error(e);}}return null;}/***更新Session節點的元數據*@param id Session ID*@param version更新版本號*@param zk*/public static void updateSessionMetaData(String id) {ZooKeeper zk =connect();try {//獲取元數據SessionMetaData metadata =getSessionMetaData(id, zk);if (metadata != null) {updateSessionMetaData(metadata, zk);}}finally {close(zk);}}/***更新Session節點的元數據*@param id Session ID*@param version更新版本號*@param zk*/public static void updateSessionMetaData(SessionMetaData metadata, ZooKeeper zk) {try {if (metadata !=null) {String id = metadata.getId();Long now = System.currentTimeMillis();//當前時間//檢查是否過期Long timeout = metadata.getLastAccessTm() + metadata.getMaxIdle();//空閑時間//如果空閑時間小于當前時間,則表示Session超時if (timeout < now) {metadata.setValidate(false);log.debug("Session節點已超時[" + id + "]");}//設置最后一次訪問時間metadata.setLastAccessTm(now);//更新節點數據String path =GROUP_NAME +"/" + id;byte[] data = SerializationUtils.serialize(metadata);zk.setData(path, data, metadata.getVersion());log.debug("更新Session節點的元數據完成[" + path + "]");}}catch (KeeperException e) {log.error(e);}catch (InterruptedException e) {log.error(e);}}/***返回ZooKeeper服務器上的Session節點的所有數據,并裝載為Map*@param id*@return*/public static Map<String, Object> getSessionMap(String id) {ZooKeeper zk = connect();if (zk !=null) {String path = GROUP_NAME +"/" + id;try {//獲取元數據SessionMetaData metadata =getSessionMetaData(path, zk);//如果不存在或是無效,則直接返回nullif (metadata == null || !metadata.getValidate()) {return null;}//獲取所有子節點List<String> nodes = zk.getChildren(path,false);//存放數據Map<String, Object> sessionMap = new HashMap<String, Object>();for (String node : nodes) {String dataPath = path +"/" + node;Stat stat = zk.exists(dataPath,false);//節點存在if (stat !=null) {//提取數據byte[] data = zk.getData(dataPath,false,null);if (data !=null) {sessionMap.put(node, SerializationUtils.deserialize(data));}else {sessionMap.put(node,null);}}}return sessionMap;}catch (KeeperException e) {log.error(e);}catch (InterruptedException e) {log.error(e);}finally {close(zk);}}return null;}/***創建一個組節點*/public static void createGroupNode() {ZooKeeper zk =connect();if (zk !=null) {try {//檢查節點是否存在Stat stat = zk.exists(GROUP_NAME,false);//stat為null表示無此節點,需要創建if (stat ==null) {//創建組件點String createPath = zk.create(GROUP_NAME,null, Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);log.debug("創建節點完成:[" + createPath + "]");}else {log.debug("組節點已存在,無需創建[" + GROUP_NAME +"]");}}catch (KeeperException e) {log.error(e);}catch (InterruptedException e) {log.error(e);}finally {close(zk);}}}/***創建指定Session ID的節點*@param sid Session ID*@return*/public static String createSessionNode(SessionMetaData metadata) {if (metadata ==null) {return null;}ZooKeeper zk = connect(); //連接服務期if (zk !=null) {String path = GROUP_NAME +"/" + metadata.getId();try {//檢查節點是否存在Stat stat = zk.exists(path,false);//stat為null表示無此節點,需要創建if (stat == null) {//創建組件點String createPath = zk.create(path,null, Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);log.debug("創建Session節點完成:[" + createPath + "]");//寫入節點數據zk.setData(path, SerializationUtils.serialize(metadata), -1);return createPath;}}catch (KeeperException e) {log.error(e);}catch (InterruptedException e) {log.error(e);}finally {close(zk);}}return null;}/***創建指定Session ID的節點(異步操作)*@param sid*@param waitFor是否等待執行結果*@return*/public static String asynCreateSessionNode(final SessionMetaData metadata, boolean waitFor) {Callable<String> task =new Callable<String>() {@Overridepublic String call()throws Exception {return createSessionNode(metadata);}};try {Future<String> result =pool.submit(task);//如果需要等待執行結果if (waitFor) {while (true) {if (result.isDone()) {return result.get();}}}}catch (Exception e) {log.error(e);}return null;}/***刪除指定Session ID的節點*@param sid Session ID*@return*/public static boolean deleteSessionNode(String sid) {ZooKeeper zk =connect(); //連接服務期if (zk !=null) {String path =GROUP_NAME +"/" + sid;try {//檢查節點是否存在Stat stat = zk.exists(path,false);//如果節點存在則刪除之if (stat !=null) {//先刪除子節點List<String> nodes = zk.getChildren(path,false);if (nodes !=null) {for (String node : nodes) {zk.delete(path +"/" + node, -1);}}//刪除父節點zk.delete(path, -1);log.debug("刪除Session節點完成:[" + path + "]");returntrue;}}catch (KeeperException e) {log.error(e);}catch (InterruptedException e) {log.error(e);}finally {close(zk);}}return false;}/***刪除指定Session ID的節點(異步操作)*@param sid*@param waitFor是否等待執行結果*@return*/public static boolean asynDeleteSessionNode(final String sid,boolean waitFor) {Callable<Boolean> task =new Callable<Boolean>() {@Overridepublic Boolean call()throws Exception {return deleteSessionNode(sid);}};try {Future<Boolean> result =pool.submit(task);//如果需要等待執行結果if (waitFor) {while (true) {if (result.isDone()) {return result.get();}}}}catch (Exception e) {log.error(e);}return false;}/***在指定Session ID的節點下添加數據節點*@param sid Session ID*@param name數據節點的名稱*@param value數據*@return*/public static boolean setSessionData(String sid, String name, Object value) {boolean result =false;ZooKeeper zk =connect(); //連接服務器if (zk !=null) {String path =GROUP_NAME +"/" + sid;try {//檢查指定的Session節點是否存在Stat stat = zk.exists(path,false);//如果節點存在則刪除之if (stat !=null) {//查找數據節點是否存在,不存在就創建一個String dataPath = path +"/" + name;stat = zk.exists(dataPath,false);if (stat ==null) {//創建數據節點zk.create(dataPath,null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);log.debug("創建數據節點完成[" + dataPath + "]");}//在節點上設置數據,所有數據必須可序列化if (valueinstanceof Serializable) {int dataNodeVer = -1;if (stat !=null) {//記錄數據節點的版本dataNodeVer = stat.getVersion();}byte[] data = SerializationUtils.serialize((Serializable) value);stat = zk.setData(dataPath, data, dataNodeVer);log.debug("更新數據節點數據完成[" + dataPath + "][" + value +"]");result = true;}}}catch (KeeperException e) {log.error(e);}catch (InterruptedException e) {log.error(e);}finally {close(zk);}}return result;}/***刪除指定Session ID的節點(異步操作)*@param sid*@param waitFor是否等待執行結果*@return*/public static boolean asynSetSessionData(final String sid,final String name,final Object value,boolean waitFor) {Callable<Boolean> task =new Callable<Boolean>() {@Overridepublic Boolean call()throws Exception {return setSessionData(sid, name, value);}};try {Future<Boolean> result =pool.submit(task);//如果需要等待執行結果if (waitFor) {while (true) {if (result.isDone()) {return result.get();}}}}catch (Exception e) {log.error(e);}return false;}/***返回指定Session ID的節點下數據*@param sid Session ID*@param name數據節點的名稱*@param value數據*@return*/public static Object getSessionData(String sid, String name) {ZooKeeper zk =connect(); //連接服務器if (zk !=null) {String path =GROUP_NAME +"/" + sid;try {//檢查指定的Session節點是否存在Stat stat = zk.exists(path,false);if (stat !=null) {//查找數據節點是否存在String dataPath = path +"/" + name;stat = zk.exists(dataPath,false);Object obj =null;if (stat !=null) {//獲取節點數據byte[] data = zk.getData(dataPath,false,null);if (data != null) {//反序列化obj = SerializationUtils.deserialize(data);}}return obj;}}catch (KeeperException e) {log.error(e);}catch (InterruptedException e) {log.error(e);}finally {close(zk);}}return null;}/***刪除指定Session ID的節點下數據*@param sid Session ID*@param name數據節點的名稱*@param value數據*@return*/public static void removeSessionData(String sid, String name) {ZooKeeper zk =connect(); //連接服務器if (zk !=null) {String path =GROUP_NAME +"/" + sid;try {//檢查指定的Session節點是否存在Stat stat = zk.exists(path,false);if (stat !=null) {//查找數據節點是否存在String dataPath = path +"/" + name;stat = zk.exists(dataPath,false);if (stat !=null) {//刪除節點zk.delete(dataPath, -1);}}}catch (KeeperException e) {log.error(e);}catch (InterruptedException e) {log.error(e);}finally {close(zk);}}} }從這個類的實現中我們可以發現,與ZooKeeper交互的API非常的友好,基本上就是對文件系統的管理——創建文件、刪除文件、檢查文件是否存在,更新文件等等。并且對節點的查找就是對文件絕對路徑的搜索,效率非常的高。
例如,用戶調用Session的getAttribute(String key)方法,則根據當前Session可以拼裝成一個搜索節點的路徑:/SESSIONS//。這樣可以快速的定位,并獲取該節點的數據。
另外,在這個類中,我還實現類一些操作的異步版本。原來是想為了提高用戶響應度,在創建、修改Session節點的時候使用異步調用,但是實際測試下來還是有問題的。所以目前放棄了所有操作的異步版本。
最后讓我們來看看連接ZooKeeper服務器的實現類,代碼如下所示:
public class ConnectionWatcher implements Watcher {private static final int SESSION_TIMEOUT = 5000;private CountDownLatchsignal = new CountDownLatch(1);private Logger log = Logger.getLogger(getClass());/****@throws IOException*@throws InterruptedException*/public ZooKeeper connection(String servers) {ZooKeeper zk;try {zk = new ZooKeeper(servers,SESSION_TIMEOUT,this);signal.await();return zk;}catch (IOException e) {log.error(e);}catch (InterruptedException e) {log.error(e);}return null;}/** (non-Javadoc)** @see* org.apache.zookeeper.Watcher#process(org.apache.zookeeper.WatchedEvent)*/public void process(WatchedEvent event) {KeeperState state = event.getState();if (state == KeeperState.SyncConnected) {signal.countDown();}} }這個類需要關注的是實現Watcher接口,在上面描述ZooKeeper特性的時候曾經提到過,ZooKeeper通過Watcher機制實現客戶端與服務器之間的松耦合交互,在process方法中,通過對各種事件的監聽,可以進行異步的回調處理。
這里的SESSION_TIMEOUT并不是Web容器中Session的超時。這是ZooKeeper對一個客戶端的連接,即一個連接會話的超時設置。該值一般設置在2~5秒之間。
后續
目前基于ZooKeeper的分布式Session系統的實現還是比較初步的。還有很多功能有待完善,比如要添加Session監聽事件的支持、對ZooKeeper上被標記為不可用的Session節點的刪除、對Session進行監控和管理的控制臺以及非常難解決的ClassLoader問題等。
另外,前文也提到了,分布式Session的實現是和某個Web容器緊密耦合的,這一點讓我很不爽。因為需要針對不同的Web容器各自實現一套Session的管理機制。不過我相信通過良好的設計,可以實現通用的組件。目前我已經實現了在Jetty和Tomcat容器下的分布式Session。
在文章的最后,我們討論一下如何解決ClassLoader問題。
- 在OSGi框架下,這個問題并不是很麻煩。因為,我們可以將所有領域對象類打包成一個單獨的Bundle。同時將分布式Session的Filter實現也打包成一個Bundle。通過動態引用的方式,就可以引入所有領域對象的類型了。
- 但在非OSGi環境下,只能將領域對象的類文件在每個子系統中都包含一份來解決ClassLoader問題。這樣會造成一個問題,就是當領域對象發生變化時,我需要重啟所有的子系統,來裝載更新后的領域對象類,而不像在OSGi下,只需要重啟這個領域對象Bundle就可以了。
總結
以上是生活随笔為你收集整理的基于ZooKeeper的分布式Session实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 广西农村信用社怎么查余额 怎么查广西农村
- 下一篇: 嵘泰股份是做什么的