【转】Kettle集群
Kettle集群
?
Kettle是一款開源的ETL工具,以其高效和可擴展性而聞名于業內。其高效的一個重要原因就是其多線程和集群功能。
Kettle的多線程采用的是一種流水線并發的機制,我們在另外的文章中專門有介紹。這里主要介紹的是kettle的集群。
???集群允許轉換以及轉換中的步驟在多個服務器上并發執行。在使用kettle集群時,首先需要定義的是Cluster schema。所謂的Cluster schema就是一系列的子服務器的集合。在一個集群中,它包含一個主服務器(Master)和多個從屬服務器服務器(slave)。如下圖所示。
?
????子服務器(Slave servers)允許你在遠程服務器上執行轉換。建立一個子服務器需要你在遠程服務器上建立一個叫做“Carte”的?web?服務器,該服務器可以從Spoon(遠程或者集群執行)或者轉換任務中接受輸入。
在以后的描述中,如果我們提到的是子服務器,則包括集群中的主服務器和從屬服務器;否則我們會以主服務器和從屬服務器來進行特別指定。
?
設計
要讓轉換是以集群方式執行,首先需要在Spoon中進行圖形化的設計工作。定義一個以集群方式運行的轉換,主要包括定義cluster schema和定義轉換兩個步驟。
?
定義cluster schema
創建子服務器
?
服務tab?選項
| 選項 | 描述 |
| 服務器名稱 | 子服務器的名稱 |
| 主機名稱或IP地址 | 用作子服務器的機器的地址 |
| 端口號 | 與遠程服務通信的端口號 |
| 用戶名 | 獲取遠程服務器的用戶名 |
| 密碼 | 獲取遠程服務器的密碼 |
| 是主服務器嗎 | 在轉換以集群形式執行時,該子服務器將作為主服務器 |
注意:?在集群環境下執行轉化時,你必須有一個子服務器作為主服務器(master server)而其余所有的子服務器都作為從屬服務器(slave)
?
Proxy tab options
| 選項 | 描述 |
| 代理服務器主機名 | 設置你要通過代理進行連接的主機名 |
| 代理服務器端口 | 設置與代理進行連接時所需的端口號 |
| ? | ? |
| Ignore proxy for hosts: regexp|separated | 指定哪些服務器不需要通過代理來進行連接。該選項支持你使用正則表達式來制定多個服務器,多個服務器之間以' | '?字符來進行分割?? |
?
創建cluster schema
?
選項描述
| 選項 | 描述 |
| Schema?名稱 | 集群schema的名稱 |
| 端口號 | 這里定義的端口號是指從哪一個端口號開始分配給子服務器。每一個在子服務器中執行的步驟都要消耗一個端口號。 注意:?確保沒有別的網絡協議會使用你定義的范圍之類的端口,否則會引起問題 |
| Sockets緩存大小 | TCP內部緩存的大小 |
| Sockets刷新間隔(rows) | 當TCP的內部緩存通過網絡完全發送出去并且被清空時處理的行數 |
| Sockets數據是否壓縮 | 如果該選項被選中,則所有的數據都會使用Gzip壓縮算法進行壓縮以減輕網絡傳輸量 |
| Dynamic Cluster | 動態集群指的是在運行的時候才能獲知從屬服務器的信息。這種情形適用于主機可以自動增加或者去除的情形,例如云計算。 主服務器的設置不變,但是它可以接受從屬服務器的注冊。一旦接受了某個從屬服務器的注冊,則每隔30秒去監視該從屬服務器是否還處于有效狀態 |
| 子服務器 | 這里是一個要在集群中使用的服務器列表。這個列表中包含一個主服務器和任意數目的從屬服務器。 在dynamic Cluster的情況下,只需要選擇主服務器即可 |
?
?
定義轉換
定義完了cluster schema后,下一步就是定義在集群環境下執行的轉換。我們這里展現的只是一個最簡單的例子,完全是為了演示而用。現實情況中的集群有可能非常復雜。
首先你像平時一樣創建轉換,以hop連接連個兩個步驟。然后你指定第二個步驟將在集群下執行
?
?
?
然后選擇需要使用的集群。轉換如圖一樣顯示在GUI中。
?
???注意?Cx4顯示這個步驟將在集群中運行,而這個集群中有4個從屬服務器。假設我們將計算結果再次存入到數據表中
?
?
????這個轉換雖然定義了集群,但是我們同樣可以讓它在單機環境下執行,而且可以得到相同的結果。這意味著你可以使用普通的本地模式來測試它。
執行轉換
要想以集群方式來運行轉換或者作業,首先需要啟動在Cluster??schema中定義的主服務器和從屬服務器,然后再運行轉換或者作業。
啟動子服務器
子服務器其實是一個嵌入式的名為Carte的小web server。要進行集群轉換,首先需要啟動cluster schema中的子服務器
?
?
腳本啟動
kettle提供了carte.bat和carte.sh(linux)批處理腳本來啟動子服務器,這種啟動方式分為兩種
使用主機號和端口號
?????Carte 127.0.0.1 8080
?????Carte 192.168.1.221 8081
使用配置文件
Carte??/foo/bar/carte-config.xml
Carte?http://www.example.com/carte-config.xml
如果cluster schema中定義了Dynamic cluster選項,則必須使用配置文件來進行啟動,當這個子服務器啟動時,它需要向配置文件中“masters”中列出的主服務器列表中匯報其運行狀態(通過調用主服務器的registerSlave服務),已達到動態地設置子服務器的目的。配置文件格式
??<slave_config>
? <masters>
??? <slaveserver>
????? <name>master1</name>
????? <hostname>localhost</hostname>
????? <port>8080</port>
????? <username>cluster</username>
????? <password>cluster</password>
????? <master>Y</master>
??? </slaveserver>
? </masters>
?
? <report_to_masters>Y</report_to_masters>
?
? <slaveserver>
??? <name>slave4-8084</name>
??? <hostname>localhost</hostname>
??? <port>8084</port>
??? <username>cluster</username>
??? <password>cluster</password>
??? <master>N</master>
? </slaveserver>
</slave_config>
?
這個配置文件主要包括以下幾個節點
????????masters:?這里列出來的服務器是當前子服務器需要向其匯報狀態的主服務器。如果當前這個子服務器是主服務器,則它將連接其它的主服務器來獲得這個集群中的所有子服務器。
????????report_to_masters?:?如果為Y,則表示需要向定義的主服務器發送消息以表明該從屬服務器存在
????????slaveserver?:?這里定義的就是當前carte實例運行時需要的子服務器的配置情況
這里定義的username和password在向主服務器調用Register服務時連接主服務器時提供的安全設置。在?<slaveserver>部分,你可以使用<network_interface>?參數,這個參數的優先級高于<hostname>參數 ,如果你的機器中安裝有多個網卡,這個設置可以起作用。
程序啟動
???Kettle提供了org.pentaho.di.www.Carte類,你可以通過該類提供的函數來啟動或者停止子服務器。
??????????啟動子服務器
SlaveServerConfig config = new SlaveServerConfig(hostname, port, false);
Carte. runCarte(config);
?
??????????停止子服務器
carte.getWebServer().stopServer();
?
子服務器內幕
????我們前面提到過子服務器實際上就是一個web server,該web server是基于Jetty這個嵌入式的開源servlet容器。
這個web server主要是提供轉換運行的環境,另外一個重要的功能通過提供servlet來在客戶端、主服務器和從屬服務器之間進行通訊和控制。主服務器和從屬服務器之間是通過httpClient來進行通訊的,通訊時傳遞的數據是xml格式。通過提供的servlet,可以實現啟動、停止、暫停轉換或者作業、獲得轉換或者作業的狀態、注冊子服務器、獲得子服務器的列表等等
Kettle主要提供了以下的幾種基于servlet的服務
??????????GetRootServlet:獲得Carte的根目錄
??????????GetStatusServlet:獲得在服務器上運行的所有的轉換和作業的狀態
??????????GetTransStatusServlet:獲得在服務器上運行的某個指定的轉換的每個步驟的運行狀態。
??????????PrepareExecutionTransServlet:讓服務器上的某個指定的轉換做好運行的準備。
??????????StartTransServlet:執行服務器上的某個指定的轉換
??????????PauseTransServlet:暫停或者重新運行某一個轉換
??????????StopTransServlet:停止正在運行的轉換
??????????CleanupTransServlet:清理運行轉換時的環境
??????????AddTransServlet:向子服務器中增加某個轉換。如果服務器中有正在運行或者準備運行的相同名字的轉換,則拋出異常。
??????????AllocateServerSocketServlet:分配一個新的socket端口號。這個端口號是基于你在定義cluster schema中設置的端口號,依次加1
??????????StartJobServlet:執行服務器上某個指定的作業
??????????StopJobServlet:停止正在運行的作業
??????????GetJobStatusServlet:獲得某個指定作業的狀態
??????????AddJobServlet:向當前的子服務器中添加某個作業。
??????????RegisterSlaveServlet:注冊某個服務器的信息。服務器信息包括子服務器是否活動、最新活動的時間、最新不活動的時間。這個在dynamic cluster中需要用到,由從屬服務器向主服務器匯報當前狀態。
??????????GetSlavesServlet:獲得集群中子服務器的信息
??????????AddExportServlet:以zip文件的形式向caret服務器傳遞作業或者轉換信息,并將信息加入到服務器中。
?
運行轉換
在spoon中運行
???在kettle的集成設計環境spoon中,你可以選擇轉換中的“運行”菜單項,或者按F9快捷鍵,彈出以下的窗口
?
這里有三個選項來決定轉換是以什么方式來執行
????????本地執行:?轉換或者作業將在你現在使用的JVM中運行。
????????遠程執行:?允許你指定一個想運行轉換的遠程服務器。這需要你在遠程服務器上安裝Pentaho Data Integration(Kettle)并且運行Carte子服務器。
????????集群方式執行:?允許你在集群環境下執行作業或者轉換
?
當你選擇“集群方式執行”選項是,你可以選擇以下的選項
????????提交轉換:?分解轉換并且將轉換提交到不同的主服務器和從屬服務器。
????????準備執行:?它將在主服務器和從屬服務器上執行轉換的初始化階段。
????????開始執行:?它將在主服務器和從屬服務器中執行實際的轉換任務。
????????顯示轉換:?顯示將要在集群上執行的生成的轉換(可以參看下面的分析).
編程運行
?????你也可以通過使用Kettle提供的API通過編程來以集群的方式運行轉換。
TransMeta transMeta = new TransMeta("cluster.ktr");
???//設置執行模式
TransExecutionConfiguration config = new TransExecutionConfiguration();
config.setExecutingClustered(true);
config.setExecutingLocally(false);
config.setExecutingRemotely(false);
config.setClusterPosting(true);
config.setClusterPreparing(true);
config.setClusterStarting(true);
config.setLogLevel(LogWriter.LOG_LEVEL_BASIC);
TransSplitter transSplitter = Trans.executeClustered(transMeta, config);
long nrErrors = Trans.monitorClusteredTransformation("cluster??test", transSplitter, null, 1);
?
需要注意的是這段代碼可以在一個獨立的JVM中執行,而不必要在主服務器中執行。
?
運行內幕
?????當以集群方式來運行轉換時,Kettle主要執行以下幾個步驟來執行分布式的處理
??分解轉換
在定義轉換的時候,如果在某個步驟中定義使用集群,那么這個步驟其實是在從屬服務器(slave server)上執行的,例如我們在前面定義轉換的modify javascript value步驟中,我們定義了使用集群,那么這個步驟將在從屬服務器中執行;而Cx4表示這個步驟是在4個從屬服務器上執行。如果步驟中沒有定義集群,則表示該步驟是在主服務器(master server)上執行。如果前一步驟在主服務器上執行,而后一步驟需要在從屬服務器上執行,或者相反,則這時需要分別在前一步驟和后一步驟之間建立一個remoteStep步驟,前面的remoteStep建立socketWriter進程,它負責從上一步驟中取出數據然后通過socket傳輸到對應的子服務器的remoteStep中。而后一步驟所在的子服務器的remoteStep步驟則建立一個socketReader,負責從socket中獲取數據,并將數據將數據傳輸到后一步驟中,以供后一步驟來進行后續處理。
所以在以集群方式執行轉換時,首要的任務是將轉換分解成可以在各個子服務器上執行的轉換。
我們還是以上面建立的轉換來進行分析描述:
?
?
上圖是在主服務器上建立的轉換
And 4 slaves transformations:
?
上圖是在4個從屬服務器上建立的轉換,我們可以注意到這四個從屬服務器上的轉換是一樣的,除了端口號不一樣。另外我們還注意到在前述Cluster Schema?定義中我們指定了端口號為4000,則為每一個建立的socket連接就是端口號4000開始,依次加1。另外,還可以看到數據是通過使用socket??Writer和socket Reader的remoteStep步驟通過TCP/IP的socket來傳遞數據的。
?
??????????提交轉換
對于第一步驟生成的子轉換,將調用每個子服務器提供的AddTransServlet服務將轉換的信息增加到每個子服務器中(包括主服務器和從屬服務器)。
??????????準備轉換
調用每個子服務器的PrepareExecutionTransServlet服務來準備轉換
??????????啟動轉換
調用每個子服務器的StartExecutionTransServlet服務來啟動轉換。
??????????監控轉換
在各服務器的轉換都啟動后,調用Trans.monitorClusteredTransformation來監控各個服務器的運行狀態(使用各子服務器提供的GetTransStatusServlet服務來獲得每個子服務器的狀態)。
?
例子
目的
做一個轉換(表輸入---à排序--à表輸出)
然后在兩臺pc機器上實驗。把集群放到排序插件上。
配置兩臺子服務器
創建子服務器
在主對象下的轉換下的子服務器右鍵單擊新建。
?
右鍵單擊子服務器新建
?
填寫相關的配置,用戶名和密碼為cluster,如果要修改得修改kettle默認路徑下的pwd下的kettle.pwd文件里的用戶名密碼。
?
這個是從屬服務器。
配置schemas
???新建schemas
?
在選擇子服務器中選擇這兩個服務器。
開啟兩臺機器的carte服務
??在10.2.4.81機器和10.2.4.188機器的控制臺開啟carte服務。
C:/pdi-open-3.1.0-826是保存kettle的文件夾
?
188機器也和他一樣。
?
在轉換中添加集群
?
右擊字段選擇選擇集群。
?
點擊確定。
出現cx1代表成功。
然后運行,就OK了。
?
---------------------------------以下內容是我自己添加的----------------------------------------
更多資料:
http://forums.pentaho.com/showthread.php?60500-clustering-in-Kettle
http://forums.pentaho.com/archive/index.php/t-54398.html
本文轉自秋楓博客園博客,原文鏈接:http://www.cnblogs.com/rwxwsblog/p/4530633.html,如需轉載請自行聯系原作者
總結
以上是生活随笔為你收集整理的【转】Kettle集群的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: node.js调试
- 下一篇: 外企面试官们爱提的十个问题