【学习】026 Zookeeper
什么Zookeeper
Zookeeper是一個(gè)分布式開源框架,提供了協(xié)調(diào)分布式應(yīng)用的基本服務(wù),它向外部應(yīng)用暴露一組通用服務(wù)——分布式同步(Distributed Synchronization)、命名服務(wù)(Naming Service)、集群維護(hù)(Group Maintenance)等,簡(jiǎn)化分布式應(yīng)用協(xié)調(diào)及其管理的難度,提供高性能的分布式服務(wù)。ZooKeeper本身可以以單機(jī)模式安裝運(yùn)行,不過(guò)它的長(zhǎng)處在于通過(guò)分布式ZooKeeper集群(一個(gè)Leader,多個(gè)Follower),基于一定的策略來(lái)保證ZooKeeper集群的穩(wěn)定性和可用性,從而實(shí)現(xiàn)分布式應(yīng)用的可靠性。
1、zookeeper是為別的分布式程序服務(wù)的
2、Zookeeper本身就是一個(gè)分布式程序(只要有半數(shù)以上節(jié)點(diǎn)存活,zk就能正常服務(wù))
3、Zookeeper所提供的服務(wù)涵蓋:主從協(xié)調(diào)、服務(wù)器節(jié)點(diǎn)動(dòng)態(tài)上下線、統(tǒng)一配置管理、分布式共享鎖、統(tǒng)> 一名稱服務(wù)等
4、雖然說(shuō)可以提供各種服務(wù),但是zookeeper在底層其實(shí)只提供了兩個(gè)功能:
管理(存儲(chǔ),讀取)用戶程序提交的數(shù)據(jù)(類似namenode中存放的metadata);?
并為用戶程序提供數(shù)據(jù)節(jié)點(diǎn)監(jiān)聽服務(wù);
Zookeeper集群機(jī)制
Zookeeper集群的角色: Leader 和 follower?
只要集群中有半數(shù)以上節(jié)點(diǎn)存活,集群就能提供服務(wù)
Zookeeper特性
1、Zookeeper:一個(gè)leader,多個(gè)follower組成的集群
2、全局?jǐn)?shù)據(jù)一致:每個(gè)server保存一份相同的數(shù)據(jù)副本,client無(wú)論連接到哪個(gè)server,數(shù)據(jù)都是一致的
3、分布式讀寫,更新請(qǐng)求轉(zhuǎn)發(fā),由leader實(shí)施
4、更新請(qǐng)求順序進(jìn)行,來(lái)自同一個(gè)client的更新請(qǐng)求按其發(fā)送順序依次執(zhí)行
5、數(shù)據(jù)更新原子性,一次數(shù)據(jù)更新要么成功,要么失敗
6、實(shí)時(shí)性,在一定時(shí)間范圍內(nèi),client能讀到最新數(shù)據(jù)
Zookeeper數(shù)據(jù)結(jié)構(gòu)
1、層次化的目錄結(jié)構(gòu),命名符合常規(guī)文件系統(tǒng)規(guī)范(類似文件系統(tǒng))?
2、每個(gè)節(jié)點(diǎn)在zookeeper中叫做znode,并且其有一個(gè)唯一的路徑標(biāo)識(shí)?
3、節(jié)點(diǎn)Znode可以包含數(shù)據(jù)和子節(jié)點(diǎn)(但是EPHEMERAL類型的節(jié)點(diǎn)不能有子節(jié)點(diǎn))
節(jié)點(diǎn)類型?
a、Znode有兩種類型:
短暫(ephemeral)(create -e /app1/test1 “test1” 客戶端斷開連接zk刪除ephemeral類型節(jié)點(diǎn))?
持久(persistent)?(create -s /app1/test2 “test2” 客戶端斷開連接zk不刪除persistent類型節(jié)點(diǎn))
b、Znode有四種形式的目錄節(jié)點(diǎn)(默認(rèn)是persistent )
PERSISTENT?
PERSISTENT_SEQUENTIAL(持久序列/test0000000019 )?
EPHEMERAL?
EPHEMERAL_SEQUENTIAL
c、創(chuàng)建znode時(shí)設(shè)置順序標(biāo)識(shí),znode名稱后會(huì)附加一個(gè)值,順序號(hào)是一個(gè)單調(diào)遞增的計(jì)數(shù)器,由父節(jié)點(diǎn)維護(hù)?
d、在分布式系統(tǒng)中,順序號(hào)可以被用于為所有的事件進(jìn)行全局排序,這樣客戶端可以通過(guò)順序號(hào)推斷事件的順序
Zookeeper應(yīng)用場(chǎng)景
數(shù)據(jù)發(fā)布與訂閱(配置中心)
發(fā)布與訂閱模型,即所謂的配置中心,顧名思義就是發(fā)布者將數(shù)據(jù)發(fā)布到ZK節(jié)點(diǎn)上,供訂閱者動(dòng)態(tài)獲取數(shù)據(jù),實(shí)現(xiàn)配置信息的集中式管理和動(dòng)態(tài)更新。例如全局的配置信息,服務(wù)式服務(wù)框架的服務(wù)地址列表等就非常適合使用。
負(fù)載均衡
這里說(shuō)的負(fù)載均衡是指軟負(fù)載均衡。在分布式環(huán)境中,為了保證高可用性,通常同一個(gè)應(yīng)用或同一個(gè)服務(wù)的提供方都會(huì)部署多份,達(dá)到對(duì)等服務(wù)。而消費(fèi)者就須要在這些對(duì)等的服務(wù)器中選擇一個(gè)來(lái)執(zhí)行相關(guān)的業(yè)務(wù)邏輯,其中比較典型的是消息中間件中的生產(chǎn)者,消費(fèi)者負(fù)載均衡。
消息中間件中發(fā)布者和訂閱者的負(fù)載均衡,linkedin開源的KafkaMQ和阿里開源的 metaq都是通過(guò)zookeeper來(lái)做到生產(chǎn)者、消費(fèi)者的負(fù)載均衡。這里以metaq為例如講下:
生產(chǎn)者負(fù)載均衡:metaq發(fā)送消息的時(shí)候,生產(chǎn)者在發(fā)送消息的時(shí)候必須選擇一臺(tái)broker上的一個(gè)分區(qū)來(lái)發(fā)送消息,因此metaq在運(yùn)行過(guò)程中,會(huì)把所有broker和對(duì)應(yīng)的分區(qū)信息全部注冊(cè)到ZK指定節(jié)點(diǎn)上,默認(rèn)的策略是一個(gè)依次輪詢的過(guò)程,生產(chǎn)者在通過(guò)ZK獲取分區(qū)列表之后,會(huì)按照brokerId和partition的順序排列組織成一個(gè)有序的分區(qū)列表,發(fā)送的時(shí)候按照從頭到尾循環(huán)往復(fù)的方式選擇一個(gè)分區(qū)來(lái)發(fā)送消息。
消費(fèi)負(fù)載均衡: 在消費(fèi)過(guò)程中,一個(gè)消費(fèi)者會(huì)消費(fèi)一個(gè)或多個(gè)分區(qū)中的消息,但是一個(gè)分區(qū)只會(huì)由一個(gè)消費(fèi)者來(lái)消費(fèi)。MetaQ的消費(fèi)策略是:
1. 每個(gè)分區(qū)針對(duì)同一個(gè)group只掛載一個(gè)消費(fèi)者。
2. 如果同一個(gè)group的消費(fèi)者數(shù)目大于分區(qū)數(shù)目,則多出來(lái)的消費(fèi)者將不參與消費(fèi)。
3. 如果同一個(gè)group的消費(fèi)者數(shù)目小于分區(qū)數(shù)目,則有部分消費(fèi)者需要額外承擔(dān)消費(fèi)任務(wù)。
在某個(gè)消費(fèi)者故障或者重啟等情況下,其他消費(fèi)者會(huì)感知到這一變化(通過(guò) zookeeper watch消費(fèi)者列表),然后重新進(jìn)行負(fù)載均衡,保證所有的分區(qū)都有消費(fèi)者進(jìn)行消費(fèi)。
命名服務(wù)(Naming Service)
命名服務(wù)也是分布式系統(tǒng)中比較常見的一類場(chǎng)景。在分布式系統(tǒng)中,通過(guò)使用命名服務(wù),客戶端應(yīng)用能夠根據(jù)指定名字來(lái)獲取資源或服務(wù)的地址,提供者等信息。被命名的實(shí)體通常可以是集群中的機(jī)器,提供的服務(wù)地址,遠(yuǎn)程對(duì)象等等——這些我們都可以統(tǒng)稱他們?yōu)槊?#xff08;Name)。其中較為常見的就是一些分布式服務(wù)框架中的服務(wù)地址列表。通過(guò)調(diào)用ZK提供的創(chuàng)建節(jié)點(diǎn)的API,能夠很容易創(chuàng)建一個(gè)全局唯一的path,這個(gè)path就可以作為一個(gè)名稱。
阿里巴巴集團(tuán)開源的分布式服務(wù)框架Dubbo中使用ZooKeeper來(lái)作為其命名服務(wù),維護(hù)全局的服務(wù)地址列表, 點(diǎn)擊這里查看Dubbo開源項(xiàng)目。在Dubbo實(shí)現(xiàn)中:
服務(wù)提供者在啟動(dòng)的時(shí)候,向ZK上的指定節(jié)點(diǎn)/dubbo/${serviceName}/providers目錄下寫入自己的URL地址,這個(gè)操作就完成了服務(wù)的發(fā)布。
服務(wù)消費(fèi)者啟動(dòng)的時(shí)候,訂閱/dubbo/${serviceName}/providers目錄下的提供者URL地址, 并向/dubbo/${serviceName} /consumers目錄下寫入自己的URL地址。
注意,所有向ZK上注冊(cè)的地址都是臨時(shí)節(jié)點(diǎn),這樣就能夠保證服務(wù)提供者和消費(fèi)者能夠自動(dòng)感應(yīng)資源的變化。 另外,Dubbo還有針對(duì)服務(wù)粒度的監(jiān)控,方法是訂閱/dubbo/${serviceName}目錄下所有提供者和消費(fèi)者的信息。
分布式通知/協(xié)調(diào)
ZooKeeper中特有watcher注冊(cè)與異步通知機(jī)制,能夠很好的實(shí)現(xiàn)分布式環(huán)境下不同系統(tǒng)之間的通知與協(xié)調(diào),實(shí)現(xiàn)對(duì)數(shù)據(jù)變更的實(shí)時(shí)處理。使用方法通常是不同系統(tǒng)都對(duì)ZK上同一個(gè)znode進(jìn)行注冊(cè),監(jiān)聽znode的變化(包括znode本身內(nèi)容及子節(jié)點(diǎn)的),其中一個(gè)系統(tǒng)update了znode,那么另一個(gè)系統(tǒng)能夠收到通知,并作出相應(yīng)處理
1. 另一種心跳檢測(cè)機(jī)制:檢測(cè)系統(tǒng)和被檢測(cè)系統(tǒng)之間并不直接關(guān)聯(lián)起來(lái),而是通過(guò)zk上某個(gè)節(jié)點(diǎn)關(guān)聯(lián),大大減少系統(tǒng)耦合。
2. 另一種系統(tǒng)調(diào)度模式:某系統(tǒng)有控制臺(tái)和推送系統(tǒng)兩部分組成,控制臺(tái)的職責(zé)是控制推送系統(tǒng)進(jìn)行相應(yīng)的推送工作。管理人員在控制臺(tái)作的一些操作,實(shí)際上是修改了ZK上某些節(jié)點(diǎn)的狀態(tài),而ZK就把這些變化通知給他們注冊(cè)Watcher的客戶端,即推送系統(tǒng),于是,作出相應(yīng)的推送任務(wù)。
3. 另一種工作匯報(bào)模式:一些類似于任務(wù)分發(fā)系統(tǒng),子任務(wù)啟動(dòng)后,到zk來(lái)注冊(cè)一個(gè)臨時(shí)節(jié)點(diǎn),并且定時(shí)將自己的進(jìn)度進(jìn)行匯報(bào)(將進(jìn)度寫回這個(gè)臨時(shí)節(jié)點(diǎn)),這樣任務(wù)管理者就能夠?qū)崟r(shí)知道任務(wù)進(jìn)度。
總之,使用zookeeper來(lái)進(jìn)行分布式通知和協(xié)調(diào)能夠大大降低系統(tǒng)之間的耦合
集群管理與Master選舉
1. 集群機(jī)器監(jiān)控:這通常用于那種對(duì)集群中機(jī)器狀態(tài),機(jī)器在線率有較高要求的場(chǎng)景,能夠快速對(duì)集群中機(jī)器變化作出響應(yīng)。這樣的場(chǎng)景中,往往有一個(gè)監(jiān)控系統(tǒng),實(shí)時(shí)檢測(cè)集群機(jī)器是否存活。過(guò)去的做法通常是:監(jiān)控系統(tǒng)通過(guò)某種手段(比如ping)定時(shí)檢測(cè)每個(gè)機(jī)器,或者每個(gè)機(jī)器自己定時(shí)向監(jiān)控系統(tǒng)匯報(bào)“我還活著”。 這種做法可行,但是存在兩個(gè)比較明顯的問(wèn)題:
1. 集群中機(jī)器有變動(dòng)的時(shí)候,牽連修改的東西比較多。
2. 有一定的延時(shí)。
利用ZooKeeper有兩個(gè)特性,就可以實(shí)現(xiàn)另一種集群機(jī)器存活性監(jiān)控系統(tǒng):
1. 客戶端在節(jié)點(diǎn) x 上注冊(cè)一個(gè)Watcher,那么如果 x?的子節(jié)點(diǎn)變化了,會(huì)通知該客戶端。
2. 創(chuàng)建EPHEMERAL類型的節(jié)點(diǎn),一旦客戶端和服務(wù)器的會(huì)話結(jié)束或過(guò)期,那么該節(jié)點(diǎn)就會(huì)消失。
例如,監(jiān)控系統(tǒng)在 /clusterServers 節(jié)點(diǎn)上注冊(cè)一個(gè)Watcher,以后每動(dòng)態(tài)加機(jī)器,那么就往 /clusterServers 下創(chuàng)建一個(gè) EPHEMERAL類型的節(jié)點(diǎn):/clusterServers/{hostname}. 這樣,監(jiān)控系統(tǒng)就能夠?qū)崟r(shí)知道機(jī)器的增減情況,至于后續(xù)處理就是監(jiān)控系統(tǒng)的業(yè)務(wù)了。
2. Master選舉則是zookeeper中最為經(jīng)典的應(yīng)用場(chǎng)景了。
在分布式環(huán)境中,相同的業(yè)務(wù)應(yīng)用分布在不同的機(jī)器上,有些業(yè)務(wù)邏輯(例如一些耗時(shí)的計(jì)算,網(wǎng)絡(luò)I/O處理),往往只需要讓整個(gè)集群中的某一臺(tái)機(jī)器進(jìn)行執(zhí)行,其余機(jī)器可以共享這個(gè)結(jié)果,這樣可以大大減少重復(fù)勞動(dòng),提高性能,于是這個(gè)master選舉便是這種場(chǎng)景下的碰到的主要問(wèn)題。
利用ZooKeeper的強(qiáng)一致性,能夠保證在分布式高并發(fā)情況下節(jié)點(diǎn)創(chuàng)建的全局唯一性,即:同時(shí)有多個(gè)客戶端請(qǐng)求創(chuàng)建 /currentMaster 節(jié)點(diǎn),最終一定只有一個(gè)客戶端請(qǐng)求能夠創(chuàng)建成功。利用這個(gè)特性,就能很輕易的在分布式環(huán)境中進(jìn)行集群選取了。
另外,這種場(chǎng)景演化一下,就是動(dòng)態(tài)Master選舉。這就要用到EPHEMERAL_SEQUENTIAL類型節(jié)點(diǎn)的特性了。
上文中提到,所有客戶端創(chuàng)建請(qǐng)求,最終只有一個(gè)能夠創(chuàng)建成功。在這里稍微變化下,就是允許所有請(qǐng)求都能夠創(chuàng)建成功,但是得有個(gè)創(chuàng)建順序,于是所有的請(qǐng)求最終在ZK上創(chuàng)建結(jié)果的一種可能情況是這樣: /currentMaster/{sessionId}-1 ,/currentMaster/{sessionId}-2,/currentMaster/{sessionId}-3 ….. 每次選取序列號(hào)最小的那個(gè)機(jī)器作為Master,如果這個(gè)機(jī)器掛了,由于他創(chuàng)建的節(jié)點(diǎn)會(huì)馬上小時(shí),那么之后最小的那個(gè)機(jī)器就是Master了。
1. 在搜索系統(tǒng)中,如果集群中每個(gè)機(jī)器都生成一份全量索引,不僅耗時(shí),而且不能保證彼此之間索引數(shù)據(jù)一致。因此讓集群中的Master來(lái)進(jìn)行全量索引的生成,然后同步到集群中其它機(jī)器。另外,Master選舉的容災(zāi)措施是,可以隨時(shí)進(jìn)行手動(dòng)指定master,就是說(shuō)應(yīng)用在zk在無(wú)法獲取master信息時(shí),可以通過(guò)比如http方式,向一個(gè)地方獲取master。
2. 在Hbase中,也是使用ZooKeeper來(lái)實(shí)現(xiàn)動(dòng)態(tài)HMaster的選舉。在Hbase實(shí)現(xiàn)中,會(huì)在ZK上存儲(chǔ)一些ROOT表的地址和HMaster的地址,HRegionServer也會(huì)把自己以臨時(shí)節(jié)點(diǎn)(Ephemeral)的方式注冊(cè)到Zookeeper中,使得HMaster可以隨時(shí)感知到各個(gè)HRegionServer的存活狀態(tài),同時(shí),一旦HMaster出現(xiàn)問(wèn)題,會(huì)重新選舉出一個(gè)HMaster來(lái)運(yùn)行,從而避免了HMaster的單點(diǎn)問(wèn)題
分布式鎖
分布式鎖,這個(gè)主要得益于 ZooKeeper 為我們保證了數(shù)據(jù)的強(qiáng)一致性。鎖服務(wù)可以分為兩類,一個(gè)是 保持獨(dú)占,另一個(gè)是 控制時(shí)序。
1. 所謂保持獨(dú)占,就是所有試圖來(lái)獲取這個(gè)鎖的客戶端,最終只有一個(gè)可以成功獲得這把鎖。通常的做法是把 zk 上的一個(gè) znode 看作是一把鎖,通過(guò) create znode 的方式來(lái)實(shí)現(xiàn)。所有客戶端都去創(chuàng)建 /distribute_lock 節(jié)點(diǎn),最終成功創(chuàng)建的那個(gè)客戶端也即擁有了這把鎖。
2. 控制時(shí)序,就是所有視圖來(lái)獲取這個(gè)鎖的客戶端,最終都是會(huì)被安排執(zhí)行,只是有個(gè)全局時(shí)序了。做法和上面基本類似,只是這里 /distributelock 已經(jīng)預(yù)先存在,客戶端在它下面創(chuàng)建臨時(shí)有序節(jié)點(diǎn)(這個(gè)可以通過(guò)節(jié)點(diǎn)的屬性控制:CreateMode.EPHEMERALSEQUENTIAL 來(lái)指定)。Zk 的父節(jié)點(diǎn)(/distribute_lock)維持一份 sequence, 保證子節(jié)點(diǎn)創(chuàng)建的時(shí)序性,從而也形成了每個(gè)客戶端的全局時(shí)序。
Zookeeper windows環(huán)境安裝
環(huán)境要求:必須要有jdk環(huán)境,本次講課使用jdk1.8
1.安裝jdk
2.安裝Zookeeper. 在官網(wǎng)http://zookeeper.apache.org/下載zookeeper.我下載的是zookeeper-3.4.6版本。
解壓zookeeper-3.4.6至D:\machine\zookeeper-3.4.6.
在D:\machine 新建data及l(fā)og目錄。
3.ZooKeeper的安裝模式分為三種,分別為:單機(jī)模式(stand-alone)、集群模式和集群偽分布模式。ZooKeeper 單機(jī)模式的安裝相對(duì)比較簡(jiǎn)單,如果第一次接觸ZooKeeper的話,建議安裝ZooKeeper單機(jī)模式或者集群偽分布模式。
安裝單擊模式。 至D:\machine\zookeeper-3.4.6\conf 復(fù)制 zoo_sample.cfg 并粘貼到當(dāng)前目錄下,命名zoo.cfg.
Zookeeper集群環(huán)境搭建(linux)
環(huán)境要求:必須要有jdk環(huán)境,本次講課使用jdk1.8
結(jié)構(gòu)
一共三個(gè)節(jié)點(diǎn)
(zk服務(wù)器集群規(guī)模不小于3個(gè)節(jié)點(diǎn)),要求服務(wù)器之間系統(tǒng)時(shí)間保持一致。
上傳zk并且解壓
進(jìn)行解壓: tar -zxvf zookeeper-3.4.6.tar.gz
重命名: mv zookeeper-3.4.6 zookeeper
修改zookeeper環(huán)境變量
vi /etc/profileexport JAVA_HOME=/opt/jdk1.8.0_71
export ZOOKEEPER_HOME=/usr/local/zookeeper
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$PATH
source /etc/profile修改zoo_sample.cfg文件
cd /usr/local/zookeeper/conf mv zoo_sample.cfg zoo.cfg 修改conf: vi zoo.cfg 修改兩處
(1)注意同時(shí)在zookeeper創(chuàng)建data目錄
dataDir=/usr/local/zookeeper/data (2)最后面添加
server.0=bhz:2888:3888 server.1=hadoop1:2888:3888 server.2=hadoop2:2888:3888創(chuàng)建服務(wù)器標(biāo)識(shí)
服務(wù)器標(biāo)識(shí)配置:
創(chuàng)建文件夾: mkdir data
創(chuàng)建文件myid并填寫內(nèi)容為0: vi
myid (內(nèi)容為服務(wù)器標(biāo)識(shí) : 0)
復(fù)制zookeeper
進(jìn)行復(fù)制zookeeper目錄到hadoop01和hadoop02
還有/etc/profile文件
把hadoop01、 hadoop02中的myid文件里的值修改為1和2
路徑(vi /usr/local/zookeeper/data/myid)
啟動(dòng)zookeeper
啟動(dòng)zookeeper:
路徑: /usr/local/zookeeper/bin
執(zhí)行: zkServer.sh start
(注意這里3臺(tái)機(jī)器都要進(jìn)行啟動(dòng))
狀態(tài): zkServer.sh
status(在三個(gè)節(jié)點(diǎn)上檢驗(yàn)zk的mode,一個(gè)leader和倆個(gè)follower)
常用命令
zkServer.sh status 查詢狀態(tài)
Zookeeper配置文件介紹
# The number of milliseconds of each tick tickTime=2000# The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/home/myuser/zooA/data # the port at which the clients will connect clientPort=2181 # ZooKeeper server and its port no. # ZooKeeper ensemble should know about every other machine in the ensemble # specify server id by creating 'myid' file in the dataDir # use hostname instead of IP address for convenient maintenance server.1=127.0.0.1:2888:3888 server.2=127.0.0.1:2988:3988 server.3=127.0.0.1:2088:3088 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir # autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature <br> #autopurge.purgeInterval=1 dataLogDir=/home/myuser/zooA/log
tickTime:心跳時(shí)間,為了確保連接存在的,以毫秒為單位,最小超時(shí)時(shí)間為兩個(gè)心跳時(shí)間
initLimit:多少個(gè)心跳時(shí)間內(nèi),允許其他server連接并初始化數(shù)據(jù),如果ZooKeeper管理的數(shù)據(jù)較大,則應(yīng)相應(yīng)增大這個(gè)值
clientPort:服務(wù)的監(jiān)聽端口
dataDir:用于存放內(nèi)存數(shù)據(jù)庫(kù)快照的文件夾,同時(shí)用于集群的myid文件也存在這個(gè)文件夾里(注意:一個(gè)配置文件只能包含一個(gè)dataDir字樣,即使它被注釋掉了。)
dataLogDir:用于單獨(dú)設(shè)置transaction log的目錄,transaction log分離可以避免和普通log還有快照的競(jìng)爭(zhēng)
syncLimit:多少個(gè)tickTime內(nèi),允許follower同步,如果follower落后太多,則會(huì)被丟棄。
server.A=B:C:D:
A是一個(gè)數(shù)字,表示這個(gè)是第幾號(hào)服務(wù)器,B是這個(gè)服務(wù)器的ip地址
C第一個(gè)端口用來(lái)集群成員的信息交換,表示的是這個(gè)服務(wù)器與集群中的Leader服務(wù)器交換信息的端口
D是在leader掛掉時(shí)專門用來(lái)進(jìn)行選舉leader所用
Zookeeper客戶端
ZooKeeper命令行工具類似于Linux的shell環(huán)境,不過(guò)功能肯定不及shell啦,但是使用它我們可以簡(jiǎn)單的對(duì)ZooKeeper進(jìn)行訪問(wèn),數(shù)據(jù)創(chuàng)建,數(shù)據(jù)修改等操作.? 使用 zkCli.sh -server 127.0.0.1:2181 連接到 ZooKeeper 服務(wù),連接成功后,系統(tǒng)會(huì)輸出 ZooKeeper 的相關(guān)環(huán)境以及配置信息。
命令行工具的一些簡(jiǎn)單操作如下:
1. 顯示根目錄下、文件: ls / 使用 ls 命令來(lái)查看當(dāng)前 ZooKeeper 中所包含的內(nèi)容
2. 顯示根目錄下、文件: ls2 / 查看當(dāng)前節(jié)點(diǎn)數(shù)據(jù)并能看到更新次數(shù)等數(shù)據(jù)
3. 創(chuàng)建文件,并設(shè)置初始內(nèi)容: create /zk "test" 創(chuàng)建一個(gè)新的 znode節(jié)點(diǎn)“ zk ”以及與它關(guān)聯(lián)的字符串
4. 獲取文件內(nèi)容: get /zk 確認(rèn) znode 是否包含我們所創(chuàng)建的字符串
5. 修改文件內(nèi)容: set /zk "zkbak" 對(duì) zk 所關(guān)聯(lián)的字符串進(jìn)行設(shè)置
6. 刪除文件: delete /zk 將剛才創(chuàng)建的 znode 刪除
7. 退出客戶端: quit
8. 幫助命令: help
Java操作Zookeeper
?Zookeeper說(shuō)明
創(chuàng)建節(jié)點(diǎn)(znode)?方法:
create:
提供了兩套創(chuàng)建節(jié)點(diǎn)的方法,同步和異步創(chuàng)建節(jié)點(diǎn)方式。
同步方式:
參數(shù)1,節(jié)點(diǎn)路徑《名稱)?:?InodeName?(不允許遞歸創(chuàng)建節(jié)點(diǎn),也就是說(shuō)在父節(jié)點(diǎn)不存在
的情況下,不允許創(chuàng)建子節(jié)點(diǎn))
參數(shù)2,節(jié)點(diǎn)內(nèi)容:?要求類型是字節(jié)數(shù)組(也就是說(shuō),不支持序列化方式,如果需要實(shí)現(xiàn)序
列化,可使用java相關(guān)序列化框架,如Hessian、Kryo框架)
參數(shù)3,節(jié)點(diǎn)權(quán)限:?使用Ids.OPEN_ACL_UNSAFE開放權(quán)限即可。(這個(gè)參數(shù)一般在權(quán)展
沒有太高要求的場(chǎng)景下,沒必要關(guān)注)
參數(shù)4,節(jié)點(diǎn)類型:?創(chuàng)建節(jié)點(diǎn)的類型:?CreateMode,提供四種首點(diǎn)象型
Maven依賴信息
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.4.6</version></dependency>Zookeeper客戶端連接
package com.hongmoshui.test;import java.io.IOException; import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper;public class Test001 {// 連接地址private static final String ADDRES = "127.0.0.1:2181";// session 會(huì)話private static final int SESSION_OUTTIME = 2000;// 信號(hào)量,阻塞程序執(zhí)行,用戶等待zookeeper連接成功,發(fā)送成功信號(hào),private static final CountDownLatch countDownLatch = new CountDownLatch(1);public static void main(String[] args) throws IOException, InterruptedException, KeeperException{ZooKeeper zk = new ZooKeeper(ADDRES, SESSION_OUTTIME, new Watcher(){public void process(WatchedEvent event){// 獲取事件狀態(tài)KeeperState keeperState = event.getState();// 獲取事件類型EventType eventType = event.getType();if (KeeperState.SyncConnected == keeperState){if (EventType.None == eventType){countDownLatch.countDown();System.out.println("zk 啟動(dòng)連接...");}}}});// 進(jìn)行阻塞 countDownLatch.await();String result = zk.create("/hongmoshui_Lasting", "Lasting".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);System.out.println(result);zk.close();} }創(chuàng)建Zookeeper節(jié)點(diǎn)信息
// 1. 創(chuàng)建持久節(jié)點(diǎn),并且允許任何服務(wù)器可以操作String result = zk.create("/hongmoshui_Lasting", "Lasting".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);System.out.println("result:" + result);// 2. 創(chuàng)建臨時(shí)節(jié)點(diǎn)String result = zk.create("/hongmoshui_temp", "temp".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);System.out.println("result:" + result);Watcher
在ZooKeeper中,接口類Watcher用于表示一個(gè)標(biāo)準(zhǔn)的事件處理器,其定義了事件通知相關(guān)的邏輯,包含KeeperState和EventType兩個(gè)枚舉類,分別代表了通知狀態(tài)和事件類型,同時(shí)定義了事件的回調(diào)方法:process(WatchedEvent event)。
什么是Watcher接口
同一個(gè)事件類型在不同的通知狀態(tài)中代表的含義有所不同,表7-3列舉了常見的通知狀態(tài)和事件類型。
?
表7-3 Watcher通知狀態(tài)與事件類型一覽
| KeeperState | EventType | 觸發(fā)條件 | 說(shuō)明 |
| ? | None | 客戶端與服務(wù)端成功建立連接 | ? |
| SyncConnected | NodeCreated | Watcher監(jiān)聽的對(duì)應(yīng)數(shù)據(jù)節(jié)點(diǎn)被創(chuàng)建 | ? |
| ? | NodeDeleted | Watcher監(jiān)聽的對(duì)應(yīng)數(shù)據(jù)節(jié)點(diǎn)被刪除 | 此時(shí)客戶端和服務(wù)器處于連接狀態(tài) |
| ? | NodeDataChanged | Watcher監(jiān)聽的對(duì)應(yīng)數(shù)據(jù)節(jié)點(diǎn)的數(shù)據(jù)內(nèi)容發(fā)生變更 | ? |
| ? | NodeChildChanged | Wather監(jiān)聽的對(duì)應(yīng)數(shù)據(jù)節(jié)點(diǎn)的子節(jié)點(diǎn)列表發(fā)生變更 | ? |
| Disconnected | None | 客戶端與ZooKeeper服務(wù)器斷開連接 | 此時(shí)客戶端和服務(wù)器處于斷開連接狀態(tài) |
| Expired | Node | 會(huì)話超時(shí) | 此時(shí)客戶端會(huì)話失效,通常同時(shí)也會(huì)受到SessionExpiredException異常 |
| AuthFailed | None | 通常有兩種情況,1:使用錯(cuò)誤的schema進(jìn)行權(quán)限檢查 2:SASL權(quán)限檢查失敗 | 通常同時(shí)也會(huì)收到AuthFailedException異常 |
?表7-3中列舉了ZooKeeper中最常見的幾個(gè)通知狀態(tài)和事件類型。
回調(diào)方法process()
process方法是Watcher接口中的一個(gè)回調(diào)方法,當(dāng)ZooKeeper向客戶端發(fā)送一個(gè)Watcher事件通知時(shí),客戶端就會(huì)對(duì)相應(yīng)的process方法進(jìn)行回調(diào),從而實(shí)現(xiàn)對(duì)事件的處理。process方法的定義如下:
abstract public void process(WatchedEvent event);
這個(gè)回調(diào)方法的定義非常簡(jiǎn)單,我們重點(diǎn)看下方法的參數(shù)定義:WatchedEvent。
WatchedEvent包含了每一個(gè)事件的三個(gè)基本屬性:通知狀態(tài)(keeperState),事件類型(EventType)和節(jié)點(diǎn)路徑(path),其數(shù)據(jù)結(jié)構(gòu)如圖7-5所示。ZooKeeper使用WatchedEvent對(duì)象來(lái)封裝服務(wù)端事件并傳遞給Watcher,從而方便回調(diào)方法process對(duì)服務(wù)端事件進(jìn)行處理。
提到WatchedEvent,不得不講下WatcherEvent實(shí)體。籠統(tǒng)地講,兩者表示的是同一個(gè)事物,都是對(duì)一個(gè)服務(wù)端事件的封裝。不同的是,WatchedEvent是一個(gè)邏輯事件,用于服務(wù)端和客戶端程序執(zhí)行過(guò)程中所需的邏輯對(duì)象,而WatcherEvent因?yàn)閷?shí)現(xiàn)了序列化接口,因此可以用于網(wǎng)絡(luò)傳輸。
服務(wù)端在生成WatchedEvent事件之后,會(huì)調(diào)用getWrapper方法將自己包裝成一個(gè)可序列化的WatcherEvent事件,以便通過(guò)網(wǎng)絡(luò)傳輸?shù)娇蛻舳恕?蛻舳嗽诮邮盏椒?wù)端的這個(gè)事件對(duì)象后,首先會(huì)將WatcherEvent還原成一個(gè)WatchedEvent事件,并傳遞給process方法處理,回調(diào)方法process根據(jù)入?yún)⒕湍軌蚪馕龀鐾暾姆?wù)端事件了。
需要注意的一點(diǎn)是,無(wú)論是WatchedEvent還是WatcherEvent,其對(duì)ZooKeeper服務(wù)端事件的封裝都是機(jī)及其簡(jiǎn)單的。舉個(gè)例子來(lái)說(shuō),當(dāng)/zk-book這個(gè)節(jié)點(diǎn)的數(shù)據(jù)發(fā)生變更時(shí),服務(wù)端會(huì)發(fā)送給客戶端一個(gè)“ZNode數(shù)據(jù)內(nèi)容變更”事件,客戶端只能夠接收到如下信
Watcher代碼
package com.hongmoshui; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat;public class ZkClientWatcher implements Watcher {// 集群連接地址private static final String CONNECT_ADDRES = "192.168.110.159:2181,192.168.110.160:2181,192.168.110.162:2181";// 會(huì)話超時(shí)時(shí)間private static final int SESSIONTIME = 2000;// 信號(hào)量,讓zk在連接之前等待,連接成功后才能往下走.private static final CountDownLatch countDownLatch = new CountDownLatch(1);private static String LOG_MAIN = "【main】 ";private ZooKeeper zk;public void createConnection(String connectAddres, int sessionTimeOut){try{zk = new ZooKeeper(connectAddres, sessionTimeOut, this);System.out.println(LOG_MAIN + "zk 開始啟動(dòng)連接服務(wù)器....");countDownLatch.await();}catch (Exception e){e.printStackTrace();}}public boolean createPath(String path, String data){try{this.exists(path, true);this.zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);System.out.println(LOG_MAIN + "節(jié)點(diǎn)創(chuàng)建成功, Path:" + path + ",data:" + data);}catch (Exception e){e.printStackTrace();return false;}return true;}/*** 判斷指定節(jié)點(diǎn)是否存在* * @param path 節(jié)點(diǎn)路徑*/public Stat exists(String path, boolean needWatch){try{return this.zk.exists(path, needWatch);}catch (Exception e){e.printStackTrace();return null;}}public boolean updateNode(String path, String data) throws KeeperException, InterruptedException{exists(path, true);this.zk.setData(path, data.getBytes(), -1);return false;}public void process(WatchedEvent watchedEvent){// 獲取事件狀態(tài)KeeperState keeperState = watchedEvent.getState();// 獲取事件類型EventType eventType = watchedEvent.getType();// zk 路徑String path = watchedEvent.getPath();System.out.println("進(jìn)入到 process() keeperState:" + keeperState + ", eventType:" + eventType + ", path:" + path);// 判斷是否建立連接if (KeeperState.SyncConnected == keeperState){if (EventType.None == eventType){// 如果建立建立成功,讓后程序往下走System.out.println(LOG_MAIN + "zk 建立連接成功!");countDownLatch.countDown();}else if (EventType.NodeCreated == eventType){System.out.println(LOG_MAIN + "事件通知,新增node節(jié)點(diǎn)" + path);}else if (EventType.NodeDataChanged == eventType){System.out.println(LOG_MAIN + "事件通知,當(dāng)前node節(jié)點(diǎn)" + path + "被修改....");}else if (EventType.NodeDeleted == eventType){System.out.println(LOG_MAIN + "事件通知,當(dāng)前node節(jié)點(diǎn)" + path + "被刪除....");}}System.out.println("--------------------------------------------------------");}public static void main(String[] args) throws KeeperException, InterruptedException{ZkClientWatcher zkClientWatcher = new ZkClientWatcher();zkClientWatcher.createConnection(CONNECT_ADDRES, SESSIONTIME); // boolean createResult = zkClientWatcher.createPath("/p15", "pa-644064");zkClientWatcher.updateNode("/pa2", "7894561");}}什么是多線程
多線程為了能夠提高應(yīng)用程序的運(yùn)行效率,在一個(gè)進(jìn)程中有多條不同的執(zhí)行路徑,同時(shí)并行執(zhí)行,互不影響。
什么是線程安全
當(dāng)多個(gè)線程同時(shí)共享,同一個(gè)全局變量或靜態(tài)變量,做寫的操作時(shí),可能會(huì)發(fā)生數(shù)據(jù)沖突問(wèn)題,也就是線程安全問(wèn)題。但是做讀操作是不會(huì)發(fā)生數(shù)據(jù)沖突問(wèn)題。
解決辦法
?使用同步代碼塊或者Lock鎖機(jī)制,保證在多個(gè)線程共享同一個(gè)變量只能有一個(gè)線程進(jìn)行操作
什么是Java內(nèi)存模型
?
共享內(nèi)存模型指的就是Java內(nèi)存模型(簡(jiǎn)稱JMM),JMM決定一個(gè)線程對(duì)共享變量的寫入時(shí),能對(duì)另一個(gè)線程可見。從抽象的角度來(lái)看,JMM定義了線程和主內(nèi)存之間的抽象關(guān)系:線程之間的共享變量存儲(chǔ)在主內(nèi)存(main memory)中,每個(gè)線程都有一個(gè)私有的本地內(nèi)存(local memory),本地內(nèi)存中存儲(chǔ)了該線程以讀/寫共享變量的副本。本地內(nèi)存是JMM的一個(gè)抽象概念,并不真實(shí)存在。它涵蓋了緩存,寫緩沖區(qū),寄存器以及其他的硬件和編譯器優(yōu)化。
從上圖來(lái)看,線程A與線程B之間如要通信的話,必須要經(jīng)歷下面2個(gè)步驟:
1. 首先,線程A把本地內(nèi)存A中更新過(guò)的共享變量刷新到主內(nèi)存中去。
2. 然后,線程B到主內(nèi)存中去讀取線程A之前已更新過(guò)的共享變量。
下面通過(guò)示意圖來(lái)說(shuō)明這兩個(gè)步驟:
如上圖所示,本地內(nèi)存A和B有主內(nèi)存中共享變量x的副本。假設(shè)初始時(shí),這三個(gè)內(nèi)存中的x值都為0。線程A在執(zhí)行時(shí),把更新后的x值(假設(shè)值為1)臨時(shí)存放在自己的本地內(nèi)存A中。當(dāng)線程A和線程B需要通信時(shí),線程A首先會(huì)把自己本地內(nèi)存中修改后的x值刷新到主內(nèi)存中,此時(shí)主內(nèi)存中的x值變?yōu)榱?。隨后,線程B到主內(nèi)存中去讀取線程A更新后的x值,此時(shí)線程B的本地內(nèi)存的x值也變?yōu)榱?。
從整體來(lái)看,這兩個(gè)步驟實(shí)質(zhì)上是線程A在向線程B發(fā)送消息,而且這個(gè)通信過(guò)程必須要經(jīng)過(guò)主內(nèi)存。JMM通過(guò)控制主內(nèi)存與每個(gè)線程的本地內(nèi)存之間的交互,來(lái)為java程序員提供內(nèi)存可見性保證。
總結(jié):什么是Java內(nèi)存模型:java內(nèi)存模型簡(jiǎn)稱jmm,定義了一個(gè)線程對(duì)另一個(gè)線程可見。共享變量存放在主內(nèi)存中,每個(gè)線程都有自己的本地內(nèi)存,當(dāng)多個(gè)線程同時(shí)訪問(wèn)一個(gè)數(shù)據(jù)的時(shí)候,可能本地內(nèi)存沒有及時(shí)刷新到主內(nèi)存,所以就會(huì)發(fā)生線程安全問(wèn)題。
分布式鎖解決辦法
傳統(tǒng)方式生成訂單號(hào)ID
業(yè)務(wù)場(chǎng)景
在分布式情況,生成全局訂單號(hào)ID
生成訂單號(hào)方案
生成訂單類
package com.hongmoshui.distributed; import java.text.SimpleDateFormat; import java.util.Date;//生成訂單類 public class OrderNumGenerator {// 全局訂單idpublic static int count = 0;public String getNumber(){try{Thread.sleep(200);}catch (Exception e){}SimpleDateFormat simpt = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss");return simpt.format(new Date()) + "-" + ++count;} }使用多線程情況模擬生成訂單號(hào)
package com.hongmoshui.distributed;//使用多線程模擬生成訂單號(hào) public class OrderService implements Runnable {private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();public void run(){getNumber();}public void getNumber(){String number = orderNumGenerator.getNumber();System.out.println(Thread.currentThread().getName() + ",生成訂單ID:" + number);}public static void main(String[] args){System.out.println("####生成唯一訂單號(hào)###");for (int i = 0; i < 100; i++){new Thread(new OrderService()).start();}} }多線程生成訂單號(hào),線程安全問(wèn)題解決
使用synchronized或者loca鎖
Synchronized同步代碼塊方式
package com.hongmoshui.distributed;//使用多線程模擬生成訂單號(hào) public class OrderSynchronizedService implements Runnable {private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();public void run(){getNumber();}public void getNumber(){synchronized (this){String number = orderNumGenerator.getNumber();System.out.println(Thread.currentThread().getName() + ",生成訂單ID:" + number);}}public static void main(String[] args){System.out.println("####生成唯一訂單號(hào)###");OrderService orderService = new OrderService();for (int i = 0; i < 100; i++){new Thread(orderService).start();}} }Lock鎖方式
package com.hongmoshui.distributed;import java.util.concurrent.locks.ReentrantLock;public class OrderLockService implements Runnable {private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();// 使用lock鎖private java.util.concurrent.locks.Lock lock = new ReentrantLock();public void run(){getNumber();}public void getNumber(){try{// synchronized (this) { lock.lock();String number = orderNumGenerator.getNumber();System.out.println(Thread.currentThread().getName() + ",生成訂單ID:" + number);// } }catch (Exception e){}finally{lock.unlock();}}public static void main(String[] args){System.out.println("####生成唯一訂單號(hào)###");OrderService orderService = new OrderService();for (int i = 0; i < 100; i++){new Thread(orderService).start();}} }分布式場(chǎng)景下生成訂單ID
業(yè)務(wù)場(chǎng)景
在分布式情況,生成全局訂單號(hào)ID
產(chǎn)生問(wèn)題
在分布式(集群)環(huán)境下,每臺(tái)JVM不能實(shí)現(xiàn)同步,在分布式場(chǎng)景下使用時(shí)間戳生成訂單號(hào)可能會(huì)重復(fù)
分布式情況下,怎么解決訂單號(hào)生成不重復(fù)
使用分布式鎖生成訂單號(hào)技術(shù)
1.使用數(shù)據(jù)庫(kù)實(shí)現(xiàn)分布式鎖
缺點(diǎn):性能差、線程出現(xiàn)異常時(shí),容易出現(xiàn)死鎖
2.使用redis實(shí)現(xiàn)分布式鎖
缺點(diǎn):鎖的失效時(shí)間難控制、容易產(chǎn)生死鎖、非阻塞式、不可重入
3.使用zookeeper實(shí)現(xiàn)分布式鎖
實(shí)現(xiàn)相對(duì)簡(jiǎn)單、可靠性強(qiáng)、使用臨時(shí)節(jié)點(diǎn),失效時(shí)間容易控制
什么是分布式鎖
分布式鎖一般用在分布式系統(tǒng)或者多個(gè)應(yīng)用中,用來(lái)控制同一任務(wù)是否執(zhí)行或者任務(wù)的執(zhí)行順序。在項(xiàng)目中,部署了多個(gè)tomcat應(yīng)用,在執(zhí)行定時(shí)任務(wù)時(shí)就會(huì)遇到同一任務(wù)可能執(zhí)行多次的情況,我們可以借助分布式鎖,保證在同一時(shí)間只有一個(gè)tomcat應(yīng)用執(zhí)行了定時(shí)任務(wù)
使用Zookeeper實(shí)現(xiàn)分布式鎖
Zookeeper實(shí)現(xiàn)分布式鎖原理
使用zookeeper創(chuàng)建臨時(shí)序列節(jié)點(diǎn)來(lái)實(shí)現(xiàn)分布式鎖,適用于順序執(zhí)行的程序,大體思路就是創(chuàng)建臨時(shí)序列節(jié)點(diǎn),找出最小的序列節(jié)點(diǎn),獲取分布式鎖,程序執(zhí)行完成之后此序列節(jié)點(diǎn)消失,通過(guò)watch來(lái)監(jiān)控節(jié)點(diǎn)的變化,從剩下的節(jié)點(diǎn)的找到最小的序列節(jié)點(diǎn),獲取分布式鎖,執(zhí)行相應(yīng)處理,依次類推……
Maven依賴
<dependency><groupId>com.101tec</groupId><artifactId>zkclient</artifactId><version>0.10</version></dependency>創(chuàng)建Lock接口
package com.hongmoshui.distributed;public interface Lock {// 獲取到鎖的資源public void getLock();// 釋放鎖public void unLock(); }創(chuàng)建ZookeeperAbstractLock抽象類
package com.hongmoshui.distributed; import org.I0Itec.zkclient.ZkClient;//將重復(fù)代碼寫入子類中.. public abstract class ZookeeperAbstractLock implements Lock {// zk連接地址private static final String CONNECTSTRING = "127.0.0.1:2181";// 創(chuàng)建zk連接protected ZkClient zkClient = new ZkClient(CONNECTSTRING);protected static final String PATH = "/lock";public void getLock(){if (tryLock()){System.out.println("##獲取lock鎖的資源####");}else{// 等待 waitLock();// 重新獲取鎖資源 getLock();}}// 獲取鎖資源abstract boolean tryLock();// 等待abstract void waitLock();public void unLock(){if (zkClient != null){zkClient.close();System.out.println("釋放鎖資源...");}}}ZookeeperDistrbuteLock類
package com.hongmoshui.distributed;import java.util.concurrent.CountDownLatch;import org.I0Itec.zkclient.IZkDataListener;public class ZookeeperDistrbuteLock extends ZookeeperAbstractLock {private CountDownLatch countDownLatch = null;@Overrideboolean tryLock(){try{zkClient.createEphemeral(PATH);return true;}catch (Exception e){ // e.printStackTrace();return false;}}@Overridevoid waitLock(){IZkDataListener izkDataListener = new IZkDataListener(){public void handleDataDeleted(String path) throws Exception{// 喚醒被等待的線程if (countDownLatch != null){countDownLatch.countDown();}}public void handleDataChange(String path, Object data) throws Exception{}};// 注冊(cè)事件 zkClient.subscribeDataChanges(PATH, izkDataListener);if (zkClient.exists(PATH)){countDownLatch = new CountDownLatch(1);try{countDownLatch.await();}catch (Exception e){e.printStackTrace();}}// 刪除監(jiān)聽 zkClient.unsubscribeDataChanges(PATH, izkDataListener);}}使用Zookeeper鎖運(yùn)行效果
package com.hongmoshui.distributed; import com.hongmoshui.OrderNumGenerator;public class OrderService implements Runnable {private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();// 使用lock鎖// private// java.util.concurrent.locks.Lock// lock = new ReentrantLock();private Lock lock = new ZookeeperDistrbuteLock();public void run(){getNumber();}public void getNumber(){try{lock.getLock();String number = orderNumGenerator.getNumber();System.out.println(Thread.currentThread().getName() + ",生成訂單ID:" + number);}catch (Exception e){e.printStackTrace();}finally{lock.unLock();}}public static void main(String[] args){System.out.println("####生成唯一訂單號(hào)###"); // OrderService orderService = new OrderService();for (int i = 0; i < 100; i++){new Thread(new OrderService()).start();}} }使用Zookeeper實(shí)現(xiàn)負(fù)載均衡原理
思路
使用Zookeeper實(shí)現(xiàn)負(fù)載均衡原理,服務(wù)器端將啟動(dòng)的服務(wù)注冊(cè)到,zk注冊(cè)中心上,采用臨時(shí)節(jié)點(diǎn)。客戶端從zk節(jié)點(diǎn)上獲取最新服務(wù)節(jié)點(diǎn)信息,本地使用負(fù)載均衡算法,隨機(jī)分配服務(wù)器。
創(chuàng)建項(xiàng)目工程
Maven依賴
<dependency><groupId>com.101tec</groupId><artifactId>zkclient</artifactId><version>0.8</version></dependency>創(chuàng)建Server服務(wù)端
ZkServerScoekt服務(wù)
ServerHandler:
package com.hongmoshui.LoadBalance; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket;//ServerHandler public class ServerHandler implements Runnable {private Socket socket;public ServerHandler(Socket socket){this.socket = socket;}public void run(){BufferedReader in = null;PrintWriter out = null;try{in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));out = new PrintWriter(this.socket.getOutputStream(), true);String body = null;while (true){body = in.readLine();if (body == null)break;System.out.println("Receive : " + body);out.println("Hello, " + body);}}catch (Exception e){if (in != null){try{in.close();}catch (IOException e1){e1.printStackTrace();}}if (out != null){out.close();}if (this.socket != null){try{this.socket.close();}catch (IOException e1){e1.printStackTrace();}this.socket = null;}}} }ZkServerScoekt:
package com.hongmoshui.LoadBalance; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket;//##ServerScoekt服務(wù)端 public class ZkServerScoekt implements Runnable {private int port = 18080;public static void main(String[] args) throws IOException{int port = 18080;ZkServerScoekt server = new ZkServerScoekt(port);Thread thread = new Thread(server);thread.start();}public ZkServerScoekt(int port){this.port = port;}public void run(){ServerSocket serverSocket = null;try{serverSocket = new ServerSocket(port);System.out.println("Server start port:" + port);Socket socket = null;while (true){socket = serverSocket.accept();new Thread(new ServerHandler(socket)).start();}}catch (Exception e){e.printStackTrace();}finally{try{if (serverSocket != null){serverSocket.close();}}catch (Exception e2){}}}}ZkServerClient
package com.hongmoshui.LoadBalance; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import java.util.ArrayList; import java.util.List;public class ZkServerClient {public static List<String> listServer = new ArrayList<String>();public static void main(String[] args){initServer();ZkServerClient client = new ZkServerClient();BufferedReader console = new BufferedReader(new InputStreamReader(System.in));while (true){String name;try{name = console.readLine();if ("exit".equals(name)){System.exit(0);}client.send(name);}catch (IOException e){e.printStackTrace();}}}// 注冊(cè)所有serverpublic static void initServer(){listServer.clear();listServer.add("127.0.0.1:18080");}// 獲取當(dāng)前server信息public static String getServer(){return listServer.get(0);}public void send(String name){String server = ZkServerClient.getServer();String[] cfg = server.split(":");Socket socket = null;BufferedReader in = null;PrintWriter out = null;try{socket = new Socket(cfg[0], Integer.parseInt(cfg[1]));in = new BufferedReader(new InputStreamReader(socket.getInputStream()));out = new PrintWriter(socket.getOutputStream(), true);out.println(name);while (true){String resp = in.readLine();if (resp == null)break;else if (resp.length() > 0){System.out.println("Receive : " + resp);break;}}}catch (Exception e){e.printStackTrace();}finally{if (out != null){out.close();}if (in != null){try{in.close();}catch (IOException e){e.printStackTrace();}}if (socket != null){try{socket.close();}catch (IOException e){e.printStackTrace();}}}} }改造ZkServerScoekt
package com.hongmoshui.LoadBalance; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import org.I0Itec.zkclient.ZkClient;public class ZkServerScoekt2 implements Runnable {private static int port = 18081;public static void main(String[] args) throws IOException{ZkServerScoekt server = new ZkServerScoekt(port);Thread thread = new Thread(server);thread.start();}public ZkServerScoekt2(int port){this.port = port;}public void regServer(){// 向ZooKeeper注冊(cè)當(dāng)前服務(wù)器ZkClient client = new ZkClient("127.0.0.1:2181", 60000, 1000);String path = "/test/server" + port;if (client.exists(path))client.delete(path);client.createEphemeral(path, "127.0.0.1:" + port);}public void run(){ServerSocket serverSocket = null;try{serverSocket = new ServerSocket(port);regServer();System.out.println("Server start port:" + port);Socket socket = null;while (true){socket = serverSocket.accept();new Thread(new ServerHandler(socket)).start();}}catch (Exception e){e.printStackTrace();}finally{try{if (serverSocket != null){serverSocket.close();}}catch (Exception e2){}}}}改造ZkServerClient
package com.hongmoshui.LoadBalance; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import java.util.ArrayList; import java.util.List; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.ZkClient;public class ZkServerClient2 {public static List<String> listServer = new ArrayList<String>();public static void main(String[] args){initServer();ZkServerClient client = new ZkServerClient();BufferedReader console = new BufferedReader(new InputStreamReader(System.in));while (true){String name;try{name = console.readLine();if ("exit".equals(name)){System.exit(0);}client.send(name);}catch (IOException e){e.printStackTrace();}}}// 注冊(cè)所有serverpublic static void initServer(){final String path = "/test";final ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 1000);List<String> children = zkClient.getChildren(path);listServer.clear();for (String p : children){listServer.add((String) zkClient.readData(path + "/" + p));}// 訂閱節(jié)點(diǎn)變化事件zkClient.subscribeChildChanges("/test", new IZkChildListener(){public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception{listServer.clear();for (String p : currentChilds){listServer.add((String) zkClient.readData(path + "/" + p));}System.out.println("####handleChildChange()####listServer:" + listServer.toString());}});}// 請(qǐng)求次數(shù)private static int count = 1;// 服務(wù)數(shù)量private static int serverCount = 2;// 獲取當(dāng)前server信息public static String getServer(){String serverName = listServer.get(count % serverCount);++count;return serverName;}public void send(String name){String server = ZkServerClient.getServer();String[] cfg = server.split(":");Socket socket = null;BufferedReader in = null;PrintWriter out = null;try{socket = new Socket(cfg[0], Integer.parseInt(cfg[1]));in = new BufferedReader(new InputStreamReader(socket.getInputStream()));out = new PrintWriter(socket.getOutputStream(), true);out.println(name);while (true){String resp = in.readLine();if (resp == null)break;else if (resp.length() > 0){System.out.println("Receive : " + resp);break;}}}catch (Exception e){e.printStackTrace();}finally{if (out != null){out.close();}if (in != null){try{in.close();}catch (IOException e){e.printStackTrace();}}if (socket != null){try{socket.close();}catch (IOException e){e.printStackTrace();}}}} }使用Zookeeper實(shí)現(xiàn)選舉策略
場(chǎng)景
? 有一個(gè)向外提供的服務(wù),服務(wù)必須7*24小時(shí)提供服務(wù),不能有單點(diǎn)故障。所以采用集群的方式,采用master、slave的結(jié)構(gòu)。一臺(tái)主機(jī)多臺(tái)備機(jī)。主機(jī)向外提供服務(wù),備機(jī)負(fù)責(zé)監(jiān)聽主機(jī)的狀態(tài),一旦主機(jī)宕機(jī),備機(jī)要迅速接代主機(jī)繼續(xù)向外提供服務(wù)。從備機(jī)選擇一臺(tái)作為主機(jī),就是master選舉。
原理分析
?右邊三臺(tái)主機(jī)會(huì)嘗試創(chuàng)建master節(jié)點(diǎn),誰(shuí)創(chuàng)建成功了,就是master,向外提供。其他兩臺(tái)就是slave。
所有slave必須關(guān)注master的刪除事件(臨時(shí)節(jié)點(diǎn),如果服務(wù)器宕機(jī)了,Zookeeper會(huì)自動(dòng)把master節(jié)點(diǎn)刪除)。如果master宕機(jī)了,會(huì)進(jìn)行新一輪的master選舉。本次我們主要關(guān)注master選舉,服務(wù)注冊(cè)、發(fā)現(xiàn)先不討論。
使用Zookeeper原理
? 領(lǐng)導(dǎo)者(leader),負(fù)責(zé)進(jìn)行投票的發(fā)起和決議,更新系統(tǒng)狀態(tài)
? 學(xué)習(xí)者(learner),包括跟隨者(follower)和觀察者(observer),follower用于接受客戶端請(qǐng)求并想客戶端返回結(jié)果,在選主過(guò)程中參與投票
? Observer可以接受客戶端連接,將寫請(qǐng)求轉(zhuǎn)發(fā)給leader,但observer不參加投票過(guò)程,只同步leader的狀態(tài),observer的目的是為了擴(kuò)展系統(tǒng),提高讀取速度
? 客戶端(client),請(qǐng)求發(fā)起方
? Zookeeper的核心是原子廣播,這個(gè)機(jī)制保證了各個(gè)Server之間的同步。實(shí)現(xiàn)這個(gè)機(jī)制的協(xié)議叫做Zab協(xié)
??議。Zab協(xié)議有兩種模式,它們分別是恢復(fù)模式(選主)和廣播模式(同步)。當(dāng)服務(wù)啟動(dòng)或者在領(lǐng)導(dǎo)者
崩潰后,Zab就進(jìn)入了恢復(fù)模式,當(dāng)領(lǐng)導(dǎo)者被選舉出來(lái),且大多數(shù)Server完成了和leader的狀態(tài)同步以后
?,恢復(fù)模式就結(jié)束了。狀態(tài)同步保證了leader和Server具有相同的系統(tǒng)狀態(tài)。
? 為了保證事務(wù)的順序一致性,zookeeper采用了遞增的事務(wù)id號(hào)(zxid)來(lái)標(biāo)識(shí)事務(wù)。所有的提議(
proposal)都在被提出的時(shí)候加上了zxid。實(shí)現(xiàn)中zxid是一個(gè)64位的數(shù)字,它高32位是epoch用來(lái)標(biāo)識(shí)
??leader關(guān)系是否改變,每次一個(gè)leader被選出來(lái),它都會(huì)有一個(gè)新的epoch,標(biāo)識(shí)當(dāng)前屬于那個(gè)leader的
統(tǒng)治時(shí)期。低32位用于遞增計(jì)數(shù)。
? 每個(gè)Server在工作過(guò)程中有三種狀態(tài):
LOOKING:當(dāng)前Server不知道leader是誰(shuí),正在搜尋
LEADING:當(dāng)前Server即為選舉出來(lái)的leader
FOLLOWING:leader已經(jīng)選舉出來(lái),當(dāng)前Server與之同步
?
??
轉(zhuǎn)載于:https://www.cnblogs.com/hongmoshui/p/11031811.html
總結(jié)
以上是生活随笔為你收集整理的【学习】026 Zookeeper的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Elasticsearch数据备份与恢复
- 下一篇: django数据查询之聚合查询和分组查询