zookeeper源码分析之六session机制
zookeeper中session意味著一個物理連接,客戶端連接服務器成功之后,會發送一個連接型請求,此時就會有session?產生。
session由sessionTracker產生的,sessionTracker的實現有SessionTrackerImpl,LocalSessionTracker,LeaderSessionTracker(leader),LearnerSessionTracker(follow and oberser)四種實現。它們的分支由各自的zookeeperServer.startup()開始。
1.sessionTrackerImpl
標準zookeeperServer的實現
public synchronized void startup() {if (sessionTracker == null) {createSessionTracker();}startSessionTracker();setupRequestProcessors();registerJMX();state = State.RUNNING;notifyAll();}其實現由sessionTrackerImpl來實現,其官方說明
/**
* This is a full featured SessionTracker. It tracks session in grouped by tick
* interval. It always rounds up the tick interval to provide a sort of grace
* period. Sessions are thus expired in batches made up of sessions that expire
* in a given interval.
*/
sessionTrackerImpl是一個線程,其run方法是真實邏輯:
@Overridepublic void run() {try {while (running) {long waitTime = sessionExpiryQueue.getWaitTime();if (waitTime > 0) {Thread.sleep(waitTime);continue;}for (SessionImpl s : sessionExpiryQueue.poll()) {setSessionClosing(s.sessionId);expirer.expire(s);}}} catch (InterruptedException e) {handleException(this.getName(), e);}LOG.info("SessionTrackerImpl exited loop!");}sessionTrackerImpl實現了session的各種操作:創建session,檢測session,刪除session等。我們以增加session為例,看一下session機制:
public long createSession(int sessionTimeout) {long sessionId = nextSessionId.getAndIncrement();addSession(sessionId, sessionTimeout);return sessionId;}public synchronized boolean addSession(long id, int sessionTimeout) {sessionsWithTimeout.put(id, sessionTimeout);boolean added = false; SessionImpl session = sessionsById.get(id);if (session == null){session = new SessionImpl(id, sessionTimeout);}// findbugs2.0.3 complains about get after put.// long term strategy would be use computeIfAbsent after JDK 1.8SessionImpl existedSession = sessionsById.putIfAbsent(id, session);if (existedSession != null) {session = existedSession;} else {added = true;LOG.debug("Adding session 0x" + Long.toHexString(id));}if (LOG.isTraceEnabled()) {String actionStr = added ? "Adding" : "Existing";ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,"SessionTrackerImpl --- " + actionStr + " session 0x"+ Long.toHexString(id) + " " + sessionTimeout);}updateSessionExpiry(session, sessionTimeout);return added;}上文中出現了一個nextSessionId,看一下其的生成方式:
/*** Generates an initial sessionId. High order byte is serverId, next 5* 5 bytes are from timestamp, and low order 2 bytes are 0s.*/public static long initializeNextSession(long id) {long nextSid;nextSid = (Time.currentElapsedTime() << 24) >>> 8;nextSid = nextSid | (id <<56);return nextSid;}創建sessionImpl
紅色代碼所示。
更新過期時間并記錄
private void updateSessionExpiry(SessionImpl s, int timeout) {logTraceTouchSession(s.sessionId, timeout, "");sessionExpiryQueue.update(s, timeout);}private void logTraceTouchSession(long sessionId, int timeout, String sessionStatus){if (!LOG.isTraceEnabled())return;String msg = MessageFormat.format("SessionTrackerImpl --- Touch {0}session: 0x{1} with timeout {2}",sessionStatus, Long.toHexString(sessionId), Integer.toString(timeout));ZooTrace.logTraceMessage(LOG, ZooTrace.CLIENT_PING_TRACE_MASK, msg);}從上面的代碼可以看出,session都存放在一個sessionById的map里面,其定義為:
protected final ConcurrentHashMap<Long, SessionImpl> sessionsById =
new ConcurrentHashMap<Long, SessionImpl>();
?
2.?LeaderSessionTracker
官方說明:
/**
* The leader session tracker tracks local and global sessions on the leader.
*/
LeaderZooKeeperServer、LeaderZooKeeperServer、FollowerZooKeeperServer、ObserverZooKeeperServer均繼承自QuorumZooKeeperServer,
QuorumZooKeeperServer的startSessionTracker方法如下:
@Overrideprotected void startSessionTracker() {upgradeableSessionTracker = (UpgradeableSessionTracker) sessionTracker;upgradeableSessionTracker.start();} UpgradeableSessionTracker的實現類有兩個:LeaderSessionTracker和LearnerSessionTracker,很顯然,對leaderZookeeper的實現為LeaderSessionTracker,LearnerSessionTracker對應FollowerZooKeeperServer、ObserverZooKeeperServer。LeaderSessionTracker的構造函數為:
public LeaderSessionTracker(SessionExpirer expirer,ConcurrentMap<Long, Integer> sessionsWithTimeouts,int tickTime, long id, boolean localSessionsEnabled,ZooKeeperServerListener listener) {this.globalSessionTracker = new SessionTrackerImpl(expirer, sessionsWithTimeouts, tickTime, id, listener);this.localSessionsEnabled = localSessionsEnabled;if (this.localSessionsEnabled) {createLocalSessionTracker(expirer, tickTime, id, listener);}serverId = id;}
其分為兩個sessionTracker,一個為globalSessionTracker,其實現為SessionTrackerImpl;另一個為localSessionTracker,其實現為:
public void createLocalSessionTracker(SessionExpirer expirer,int tickTime, long id, ZooKeeperServerListener listener) {this.localSessionsWithTimeouts =new ConcurrentHashMap<Long, Integer>();this.localSessionTracker = new LocalSessionTracker(expirer, this.localSessionsWithTimeouts, tickTime, id, listener);} LeaderSessionTracker啟動時同時啟動global和local: public void start() {globalSessionTracker.start();if (localSessionTracker != null) {localSessionTracker.start();}}創建session的過程:
public long createSession(int sessionTimeout) {if (localSessionsEnabled) {return localSessionTracker.createSession(sessionTimeout);}return globalSessionTracker.createSession(sessionTimeout);} globalSessionTracker的創建session上面已經論述,且看localSessionTracker的生成session,進一步代碼發現LocalSessionTracker繼承了SessionTrackerImpl,沒有重寫其創建session方法,即global和local創建session的方法相同。3. LearnerSessionTracker
官方說明:
/**
* The learner session tracker is used by learners (followers and observers) to
* track zookeeper sessions which may or may not be echoed to the leader. When
* a new session is created it is saved locally in a wrapped
* LocalSessionTracker. It can subsequently be upgraded to a global session
* as required. If an upgrade is requested the session is removed from local
* collections while keeping the same session ID. It is up to the caller to
* queue a session creation request for the leader.
* A secondary function of the learner session tracker is to remember sessions
* which have been touched in this service. This information is passed along
* to the leader with a ping.
*/
其構造方法是: public LearnerSessionTracker(SessionExpirer expirer,ConcurrentMap<Long, Integer> sessionsWithTimeouts,int tickTime, long id, boolean localSessionsEnabled,ZooKeeperServerListener listener) {this.expirer = expirer;this.touchTable.set(new ConcurrentHashMap<Long, Integer>());this.globalSessionsWithTimeouts = sessionsWithTimeouts;this.serverId = id;nextSessionId.set(SessionTrackerImpl.initializeNextSession(serverId));this.localSessionsEnabled = localSessionsEnabled;if (this.localSessionsEnabled) {createLocalSessionTracker(expirer, tickTime, id, listener);}}
啟動時只啟動了localSessionTracker:
public void start() {if (localSessionTracker != null) {localSessionTracker.start();}}創建session時也僅僅由localSessionTracker生成:
public long createSession(int sessionTimeout) {if (localSessionsEnabled) {return localSessionTracker.createSession(sessionTimeout);}return nextSessionId.getAndIncrement();}?4 小結
? 根據服務器角色不同,ZooKeeperServer,LeaderZooKeeperServer,FollowerZooKeeperServer,ObserverZooKeeperServer分別代表單機服務器,集群leader服務器,集群Follower服務器,集群observer服務器,它們的sessionTracker實現是不同的。ZookeeperServer的對應sessionTracker實現是SessionTrackerImpl;LeaderZooKeeperServer的對應sessionTracker實現是LeaderSessionTracker,FollowerZooKeeperServer,ObserverZooKeeperServer的對應sessionTracker實現是LearnerSessionTracker。
有可以分為globalSessionTracker和LocalSessionTracker,其中單機只有一個標準的SessionTrackerImpl,集群leader開啟globalSessionTracker和LocalSessionTracker,follower和observer只開啟LocalSessionTracker。globalSessionTracker由SessionTrackerImpl實現,LocalSessionTracker繼承并擴展了SessionTrackerImpl。
?
?
?
?
?
?
?
?
?
?
??
轉載于:https://www.cnblogs.com/davidwang456/p/5009659.html
總結
以上是生活随笔為你收集整理的zookeeper源码分析之六session机制的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spring Profiles exam
- 下一篇: hadoop和spark搭建记录