使用Apache Storm和Kite SDK Morphlines的可配置ETL处理
從我擔任軟件工程師的第一天起,我總是聽到很多方面的相同要求:
“ 我們希望所有內容都可配置,我們希望在運行時更改所有內容,我們希望有一個可視化工具來應用所有這些邏輯,以便非開發人員使用和配置我們的應用程序。 ”
我也喜歡這種通用范圍,但是眾所周知,軟件系統的適應性不強,客戶的需求也不穩定。
在過去的幾年中,我們已經使用傳統的框架/技術(JMX,分布式緩存,Spring或JEE等)構建了此類可配置應用程序(并非100%可配置)。
近年來,我們的體系結構中還必須包含一個附加概念,這就是大數據 (或3V或4V或任何更合適的詞)的概念。 這個新概念淘汰了我們熟悉并在舊的3層應用程序中應用的各種解決方案或變通方法。
有趣的是,我很多次都和十年前一樣。 這是軟件開發的規則,它永遠不會結束,因此個人才能和新冒險也永遠不會結束:-)
主要問題仍然是相同的,即如何構建可配置的ETL分布式應用程序 。
因此,我建立了一個小型的適應性強的解決方案,該解決方案在許多用例中可能會有所幫助。 我在大數據世界中使用了3種常用工具: Java , Apache Storm和Kite SDK Morplines 。 Java是主要的編程語言, Apache Storm是分布式流處理引擎,而Kite SDK Morphlines是可配置的ETL引擎。
風箏SDK Morplines
從其描述復制而來: Morphlines是一個開源框架,它減少了構建和更改Hadoop ETL流處理應用程序所需的時間和精力,該應用程序可將數據提取,轉換并加載到Apache Solr,HBase,HDFS,Enterprise Data Warehouse或Analytic Online Dashboards中。 morphline是一個豐富的配置文件,可以輕松定義一個轉換鏈,該轉換鏈可以使用來自任何類型數據源的任何類型的數據,處理數據并將結果加載到Hadoop組件中。 它用簡單的配置步驟代替了Java編程,并相應地減少了與開發和維護定制ETL項目相關的成本和集成工作。
除了內置命令外 ,您還可以輕松實現自己的命令 ,并在嗎啉配置文件中使用它。
示例Morphline配置讀取一個JSON字符串,解析它,然后只記錄一個特定的JSON元素:
morphlines : [{id : json_terminal_logimportCommands : ["org.kitesdk.**"]commands : [# read the JSON blob{ readJson: {} }# extract JSON objects into head fields{ extractJsonPaths {flatten: truepaths: {name: /nameage: /age}} }# log data{ logInfo {format : "name: {}, record: {}"args : ["@{name}", "@{}"]}}] }]風暴變身螺栓
為了在Storm中使用Morphlines,我實現了一個自定義MorphlinesBolt 。 該螺栓的主要職責是:
- 通過配置文件初始化Morphlines處理程序
- 初始化映射說明:
a)從元組到嗎啉輸入,以及
b)從Morphline輸出到新的輸出元組 - 使用已初始化的Morplines上下文處理每個傳入事件
- 如果Bolt不是Terminal ,則使用提供的Mapper (類型“ b”),使用Morphline執行的輸出發出一個新的Tuple。
簡單的可配置ETL拓撲
為了測試自定義MorphlinesBolt ,我編寫了2個簡單的測試。 在這些測試中,您可以看到MorphlinesBolt是如何初始化的,然后是每次執行的結果。 作為輸入,我使用了一個自定義的Spout(RandomJsonTestSpout),它僅每100毫秒發出一次新的JSON字符串(可配置)。
DummyJsonTerminalLogTopology
一個簡單的拓撲 ,該拓撲通過配置文件和每個傳入的元組的執行Morphline處理程序來配置Morphline上下文。 在此拓撲上, MorphlinesBolt被配置為終端螺栓,這意味著對于每個輸入Tuple不會發出新的Tuple。
public class DummyJsonTerminalLogTopology {public static void main(String[] args) throws Exception {Config config = new Config();RandomJsonTestSpout spout = new RandomJsonTestSpout().withComplexJson(false);String2ByteArrayTupleMapper tuppleMapper = new String2ByteArrayTupleMapper();tuppleMapper.configure(CmnStormCons.TUPLE_FIELD_MSG);MorphlinesBolt morphBolt = new MorphlinesBolt().withTupleMapper(tuppleMapper).withMorphlineId("json_terminal_log").withMorphlineConfFile("target/test-classes/morphline_confs/json_terminal_log.conf");TopologyBuilder builder = new TopologyBuilder();builder.setSpout("WORD_SPOUT", spout, 1);builder.setBolt("MORPH_BOLT", morphBolt, 1).shuffleGrouping("WORD_SPOUT");if (args.length == 0) {LocalCluster cluster = new LocalCluster();cluster.submitTopology("MyDummyJsonTerminalLogTopology", config, builder.createTopology());Thread.sleep(10000);cluster.killTopology("MyDummyJsonTerminalLogTopology");cluster.shutdown();System.exit(0);} else if (args.length == 1) {StormSubmitter.submitTopology(args[0], config, builder.createTopology());} else {System.out.println("Usage: DummyJsonTerminalLogTopology <topology_name>");}} }DummyJson2StringTopology
一個簡單的拓撲 ,該拓撲通過配置文件和每個傳入的元組的執行Morphline處理程序來配置Morphline上下文。 在此拓撲上, MorphlinesBolt被配置為普通螺栓,這意味著對于每個輸入Tuple,它都會發出一個新的Tuple。
public class DummyJson2StringTopology {public static void main(String[] args) throws Exception {Config config = new Config();RandomJsonTestSpout spout = new RandomJsonTestSpout().withComplexJson(false);String2ByteArrayTupleMapper tuppleMapper = new String2ByteArrayTupleMapper();tuppleMapper.configure(CmnStormCons.TUPLE_FIELD_MSG);MorphlinesBolt morphBolt = new MorphlinesBolt().withTupleMapper(tuppleMapper).withMorphlineId("json2string").withMorphlineConfFile("target/test-classes/morphline_confs/json2string.conf")//.withOutputProcessors(Arrays.asList(resultRecordHandlers));.withOutputFields(CmnStormCons.TUPLE_FIELD_MSG).withRecordMapper(RecordHandlerFactory.genDefaultRecordHandler(String.class, new JsonNode2StringResultMapper()));LoggingBolt printBolt = new LoggingBolt().withFields(CmnStormCons.TUPLE_FIELD_MSG);TopologyBuilder builder = new TopologyBuilder();builder.setSpout("WORD_SPOUT", spout, 1);builder.setBolt("MORPH_BOLT", morphBolt, 1).shuffleGrouping("WORD_SPOUT");builder.setBolt("PRINT_BOLT", printBolt, 1).shuffleGrouping("MORPH_BOLT");if (args.length == 0) {LocalCluster cluster = new LocalCluster();cluster.submitTopology("MyDummyJson2StringTopology", config, builder.createTopology());Thread.sleep(10000);cluster.killTopology("MyDummyJson2StringTopology");cluster.shutdown();System.exit(0);} else if (args.length == 1) {StormSubmitter.submitTopology(args[0], config, builder.createTopology());} else {System.out.println("Usage: DummyJson2StringTopology <topology_name>");}} }最后的想法
MorphlinesBolt可以用作任何可配置ETL“解決方案”的一部分(作為單處理Bolt,作為終端Bolt,作為復雜管道的一部分,等等)。
在github中的示例項目集中,源代碼作為Maven模塊( sv-etl-storm-morphlines )提供。
最好的組合是將MorphlinesBolt與Flux一起使用。 這可能會為您提供完全可配置的ETL拓撲!!!
我還沒有添加為選項,以便保持較少的依賴關系(我可以添加范圍“ test”)。
該模塊不是最終模塊,我將嘗試對其進行改進,因此許多人會在第一個實現中發現各種錯誤。
對于任何其他想法或說明,請寫評論:)
這是我2016年的第一篇文章! 希望您身體健康,思想和行動更好。 一切的第一項美德/價值是人類以及對我們所生活的環境(社會,地球,動物,植物等)的尊重。 所有其他都是次要優先事項,不應破壞優先事項所隱含的內容。 始終牢記最重要的美德,并在您采取的任何行動或思想中考慮它們。
翻譯自: https://www.javacodegeeks.com/2016/01/configurable-etl-processing-using-apache-storm-kite-sdk-morphlines.html
總結
以上是生活随笔為你收集整理的使用Apache Storm和Kite SDK Morphlines的可配置ETL处理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 气宜鼓荡
- 下一篇: 消息称Meta计划明年初开发全新的大型语