使用Oracle Data Integrator(和Kafka / MapR流)完善Lambda体系结构
“ Lambda體系結構是一種數據處理體系結構,旨在通過利用批處理和流處理方法來處理大量數據。 這種體系結構方法試圖通過使用批處理提供批處理數據的全面而準確的視圖,同時使用實時流處理提供在線數據的視圖來平衡延遲 , 吞吐量和容錯能力 。 在演示之前,可以將兩個視圖輸出合并。 lambda體系結構的興起與大數據的增長,實時分析以及減輕地圖縮減延遲的驅動力有關。” –維基百科
以前,我已經寫了一些博客,涉及許多用例,這些用例是使用Oracle Data Integrator(ODI)在MapR分發之上進行批處理,以及使用Oracle GoldenGate(OGG)將事務數據流式傳輸到MapR Streams和其他Hadoop組件中。 最新的ODI(12.2.1.2.6)結合了這兩種產品以完全適合lambda架構,同時具有許多新的強大功能,包括能夠將Kafka流作為ODI本身的源和目標進行處理。 通過簡化我們在一種產品下以相同邏輯設計處理和處理批處理和快速數據的方式,此功能對已經擁有或計劃擁有lambda架構的任何人都具有巨大的優勢。 現在,如果我們將OGG流傳輸功能和ODI批處理/流傳輸功能結合在一起,則可能性是無限的。
在本博客中,我將向您展示如何使用Spark Streaming在Oracle Data Integrator上配置MapR流(aka Kafka)以創建真正的lambda體系結構:補充批處理和服務層的快速層。
在本文中,我將跳過ODI的“贊揚和稱贊”部分,但我只想強調一點:自從ODI首次發布以來,為該博客設計的映射,就像您將設計的所有其他映射一樣,都是您可以直接在Hadoop / Spark集群上以100%的本機代碼運行,而無需編寫零行代碼,也不必擔心如何以及在何處編碼。
我已經在MapR上完成了此操作,因此我可以制作“兩只鳥一塊石頭”。 向您展示MapR Streams步驟和Kafka。 由于兩者在概念或API實現上并沒有太大差異,因此如果您使用的是Kafka,則可以輕松地應用相同的步驟。
如果您不熟悉MapR Streams和/或Kafka概念,建議您花一些時間來閱讀它們。 以下內容假定您知道什么是MapR Streams和Kafka(當然還有ODI)。 否則,您仍然會對可能的功能有個好主意。
準備工作
MapR Streams(aka Kafka)相關的準備工作
顯然,我們需要創建MapR Streams路徑和主題。 與Kafka不同,MapR通過“ maprcli”命令行實用程序使用其自己的API來創建和定義主題。 因此,如果您使用商品Kafka,則此步驟將略有不同。 Web上有很多有關如何創建和配置Kafka主題和服務器的示例,因此您并不孤單。
為了進行此演示,我創建了一個路徑和該路徑下的兩個主題。 我們將讓ODI從其中一個主題(注冊)進行消費,并生成另一個主題(registrations2)。 這樣,您將看到它如何通過ODI起作用。
創建一個名為“ users-stream”的MapR Streams路徑和一個名為“ registrations”的主題:
在我之前定義的相同路徑上創建第二個主題“ registrations2”:
Hadoop相關準備
由于我使用的是已安裝并正在運行MapR的個人預配置VM,因此此處沒有很多準備工作。 但是,需要一些步驟才能成功完成ODI映射。 如果您想知道我如何使ODI可以用于MapR發行版,則可以參考此博客文章 。
- Spark:我已經在Spark 1.6.1上進行了測試,您也應該這樣做。 至少不要轉到任何較低版本。 此外,您需要針對Spark構建具有特定的標簽版本。 我從標簽1605(這是MapR發布約定)開始測試,但是我的工作失敗了。 究其原因,我發現PySpark庫不是MapR Streams API的最新版本。 他們可以使用商品Kafka,但不能使用MapR。 這是我使用過的RPM的鏈接 。
- Spark日志記錄:在spark路徑下,有一個“ config”文件夾,其中包含不同的配置文件。 如果需要的話,我們只對其中一項感興趣。 文件名為“ log4j.properties”。 您需要確保將“ rootCategory”參數設置為INFO,否則,當您運行提交到Spark的任何ODI映射時,都會出現異常:
- Hadoop憑證存儲:在提交的任何作業中需要某些密碼時,ODI都將引用Hadoop憑證存儲。 這樣,我們就不會在參數/屬性文件或代碼本身中包含任何明確的密碼。 在此演示中,我們將在某個時候使用MySQL,因此我需要創建一個存儲并為MySQL密碼添加別名。 首先,您需要確保在core-site.xml中有一個用于憑證存儲的條目,然后實際上為密碼值創建一個別名:
上一張圖片是我的“ site-core.xml”的摘要,向您顯示了我添加的憑據存儲。 下一步將是驗證商店是否存在,然后為密碼值創建別名:
更改之后,即使在編輯core-site.xml之后,也無需重新啟動任何hadoop組件。
注意:如果您遇到“操作系統異常”(例如137),請確保您有足夠的可用內存。
ODI相關準備
您將在ODI中進行的常規準備工作。 我將在此博客中顯示相關內容。
Hadoop數據服務器
以下配置特定于MapR。 如果使用其他發行版,則需要輸入相關的端口號和路徑:
Spark-Python數據服務器
在此ODI版本12.2.1.2.6中,如果要使用Spark Streaming和常規Spark服務器/群集,則需要創建多個Spark數據服務器。 在此演示中,我僅創建了Spark Streaming服務器,并將其稱為Spark-Async。
您需要將“主群集”值更改為實際使用的值:yarn-client或yarn-cluster,然后選擇我們先前創建的Hadoop DataServer。
現在,這里配置的有趣部分是Spark-Async數據服務器的屬性:
我已經強調了您需要注意的最重要的方面。 之所以使用ASYNC,是因為我們將使用Spark Streaming。 其余屬性與性能有關。
Kafka數據服務器
在這里,我們將定義MapR Streams數據服務器:
元數據代理具有一個“虛擬”地址,僅符合Kafka API。 MapR Streams客戶端將為您提供連接到MapR Streams所需的服務。 您可能無法在此處測試數據服務器,因為在MapR上沒有運行這樣的Kafka服務器。 因此,請安全地忽略此處的測試連接,因為它將失敗(這樣就可以了)。
對于屬性,您需要定義以下內容:
您需要手動定義“ key.deserializer”和“ value.deserializer”。 MapR Streams都需要這兩者,如果未定義作業,作業將失敗。
ODI映射設計
我已經在這里進行了測試,涵蓋了五個用例。 但是,我將只完整介紹一個,并突出顯示其他內容,以免您閱讀多余和常識性的步驟。
1)MapR Streams(Kafka)=> Spark Streaming => MapR Streams(Kafka):
在此映射中,我們將從先前創建的主題中讀取流數據,應用一些功能(簡單的功能),然后將結果生成到另一個主題。 這是映射的邏輯設計:
我通過復制已經為MySQL反向工程設計的模型之一(結構相同)定義了MapR_Streams_Registrations1模型,但是在這種情況下,當然選擇的技術是Kafka。 您將能夠選擇流數據的格式:Avro,JSON,Parquet或Delimited:
物理設計如下所示:
- SOURCE_GROUP:這是我們的MapR Streams主題“注冊”
- TRANS_GROUP:這是我們的Spark異步服務器
- TARGET_GROUP:這是我們的MapR Streams主題“ registrations2”
物理實現的屬性為:
您需要選擇暫存位置作為Spark Async并啟用“流式傳輸”。
要將主題注冊中的流數據加載到Spark流中,我們需要選擇合適的LKM,即LKM Kafka到Spark:
然后從Spark Streaming加載到MapR Stream目標主題registrations2,我們需要選擇LKM Spark到Kafka:
2)MapR-FS(HDFS)=> Spark Streaming => MapR Streams(Kafka):
除了使用的知識模塊之外,我在這里不會向您展示太多。 要將MapR-FS(HDFS)加載到Spark Streaming,我使用了LKM File來Spark:
為了從Spark Streaming加載到MapR Streams,我像以前的映射一樣使用LKM Spark到Kafka。
注意:LKM File to Spark將充當一個流,一個文件流(顯然)。 ODI將僅接收任何更新/新文件,而不是靜態文件。
3)MapR Streams(Kafka)=> Spark Streaming => MySQL:
要將MapR Streams(Kafka)加載到Spark Streaming,就像在第一個映射中一樣,我使用了LKM Kafka到Spark。 然后從Spark Streaming加載到MySQL,我使用了LKM Spark到SQL:
4)MapR流(Kafka)=> Spark流=> MapR-FS(HDFS)
為了從MapR流加載到Spark流,我像以前一樣使用LKM Kafka到Spark,然后從Spark Stream加載到MapR-FS(HDFS),我已經使用LKM Spark到File:
5)MapR Streams(Kafka)和Oracle DB => Spark Streaming => MySQL
這是另一個有趣的用例,您實際上可以在現場將Kafka流與SQL源一起加入。 這僅(當前)適用于查找組件:
請注意,驅動程序源必須是Kafka(在我們的示例中為MapR流),而查找源必須是SQL數據庫。 我使用了與以前的映射幾乎相同的LKM:從LKM SQL到Spark,從LKM Kafka到Spark和從LKM Spark到SQL。
行刑
我將僅向您展示第一個用例的執行步驟,即MapR Streams(Kafka)=> Spark Streaming => MapR Streams(Kafka)。 為了模擬這種情況,我創建了一個Kafka生產者控制臺和另一個Kafka消費者控制臺,以便可以監視結果。 查看下面的生產者,我粘貼了一些記錄:
我已經突出顯示了其中一個URL,以確保您注意到它是小寫的。 等待幾秒鐘,Spark將處理這些消息并將其發送到目標MapR Streams主題:
請注意,所有URL均大寫。 成功!
通過映射,結果與預期的一樣。 因為它們很簡單,所以我不會為它們顯示測試步驟。 這里的想法是向您展示如何使用MapR Streams(Kafka)配置ODI。
最后的話
值得一提的是,在執行任何映射時,您都可以鉆取日志并查看正在發生的事情(生成的代碼等)。 此外,您將獲得指向工作歷史URL的鏈接以在Spark UI上訪問它:
打開鏈接將帶我們到Spark UI:
如果要控制流作業可以生存多長時間,則需要增加Spark-Async數據服務器的“ spark.streaming.timeout”屬性或從映射配置本身覆蓋它。 您可能還需要創建一個ODI程序包,該程序包具有一個循環和其他有用的組件來滿足您的業務需求。
結論
ODI可以處理lambda架構中的兩個層:批處理層和快速層。 這不僅是ODI在其非常長的綜合功能列表中添加的一項重要功能,而且還將提高從一個統一,易于使用的界面設計數據管道的生產率和效率。 顯然,ODI可以像使用商品Kafka一樣輕松地與MapR Streams一起使用,這要感謝MapR的二進制文件與Kafka API兼容,以及ODI不需要依賴于一個框架。 這可以確保您ODI是真正的開放式模塊化E-LT工具,與其他工具不同。
其他一些相關職位:
- Oracle Data Integrator和MapR融合數據平臺:請檢查!
- 使用Oracle GoldenGate將事務數據流式傳輸到MapR流中
- 使用Oracle GoldenGate進行MapR-FS實時事務數據提取
- 帶有ODI的逆向工程師MapR-DB
免責聲明
這里表達的思想,實踐和觀點僅是作者的觀點,不一定反映Oracle的觀點。
翻譯自: https://www.javacodegeeks.com/2017/02/perfecting-lambda-architecture-oracle-data-integrator-kafka-mapr-streams.html
總結
以上是生活随笔為你收集整理的使用Oracle Data Integrator(和Kafka / MapR流)完善Lambda体系结构的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 电脑系统怎么清理流氓软件 删除垃圾软件的
- 下一篇: 将Gatling集成到Gradle构建中