使用Storm实现实时大数据分析!
簡單和明了,Storm讓大數(shù)據(jù)分析變得輕松加愉快。
當(dāng)今世界,公司的日常運營經(jīng)常會生成TB級別的數(shù)據(jù)。數(shù)據(jù)來源囊括了互聯(lián)網(wǎng)裝置可以捕獲的任何類型數(shù)據(jù),網(wǎng)站、社交媒體、交易型商業(yè)數(shù)據(jù)以及其它商業(yè)環(huán)境中創(chuàng)建的數(shù)據(jù)。考慮到數(shù)據(jù)的生成量,實時處理成為了許多機構(gòu)需要面對的首要挑戰(zhàn)。我們經(jīng)常用的一個非常有效的開源實時計算工具就是Storm —— Twitter開發(fā),通常被比作“實時的Hadoop”。然而Storm遠比Hadoop來的簡單,因為用它處理大數(shù)據(jù)不會帶來新老技術(shù)的交替。
Shruthi Kumar、Siddharth Patankar共同效力于Infosys,分別從事技術(shù)分析和研發(fā)工作。本文詳述了Storm的使用方法,例子中的項目名稱為“超速報警系統(tǒng)(Speeding Alert System)”。我們想實現(xiàn)的功能是:實時分析過往車輛的數(shù)據(jù),一旦車輛數(shù)據(jù)超過預(yù)設(shè)的臨界值 —— 便觸發(fā)一個trigger并把相關(guān)的數(shù)據(jù)存入數(shù)據(jù)庫。
Storm
對比Hadoop的批處理,Storm是個實時的、分布式以及具備高容錯的計算系統(tǒng)。同Hadoop一樣Storm也可以處理大批量的數(shù)據(jù),然而Storm在保證高可靠性的前提下還可以讓處理進行的更加實時;也就是說,所有的信息都會被處理。Storm同樣還具備容錯和分布計算這些特性,這就讓Storm可以擴展到不同的機器上進行大批量的數(shù)據(jù)處理。他同樣還有以下的這些特性:
- 易于擴展。對于擴展,你只需要添加機器和改變對應(yīng)的topology(拓撲)設(shè)置。Storm使用Hadoop Zookeeper進行集群協(xié)調(diào),這樣可以充分的保證大型集群的良好運行。
- 每條信息的處理都可以得到保證。
- Storm集群管理簡易。
- Storm的容錯機能:一旦topology遞交,Storm會一直運行它直到topology被廢除或者被關(guān)閉。而在執(zhí)行中出現(xiàn)錯誤時,也會由Storm重新分配任務(wù)。
- 盡管通常使用Java,Storm中的topology可以用任何語言設(shè)計。
當(dāng)然為了更好的理解文章,你首先需要安裝和設(shè)置Storm。需要通過以下幾個簡單的步驟:
- 從Storm官方下載Storm安裝文件
- 將bin/directory解壓到你的PATH上,并保證bin/storm腳本是可執(zhí)行的。
Storm組件
Storm集群主要由一個主節(jié)點和一群工作節(jié)點(worker node)組成,通過 Zookeeper進行協(xié)調(diào)。
主節(jié)點:
主節(jié)點通常運行一個后臺程序 —— Nimbus,用于響應(yīng)分布在集群中的節(jié)點,分配任務(wù)和監(jiān)測故障。這個很類似于Hadoop中的Job Tracker。
工作節(jié)點:
工作節(jié)點同樣會運行一個后臺程序 —— Supervisor,用于收聽工作指派并基于要求運行工作進程。每個工作節(jié)點都是topology中一個子集的實現(xiàn)。而Nimbus和Supervisor之間的協(xié)調(diào)則通過Zookeeper系統(tǒng)或者集群。
Zookeeper
Zookeeper是完成Supervisor和Nimbus之間協(xié)調(diào)的服務(wù)。而應(yīng)用程序?qū)崿F(xiàn)實時的邏輯則被封裝進Storm中的“topology”。topology則是一組由Spouts(數(shù)據(jù)源)和Bolts(數(shù)據(jù)操作)通過Stream Groupings進行連接的圖。下面對出現(xiàn)的術(shù)語進行更深刻的解析。
Spout:
簡而言之,Spout從來源處讀取數(shù)據(jù)并放入topology。Spout分成可靠和不可靠兩種;當(dāng)Storm接收失敗時,可靠的Spout會對tuple(元組,數(shù)據(jù)項組成的列表)進行重發(fā);而不可靠的Spout不會考慮接收成功與否只發(fā)射一次。而Spout中最主要的方法就是nextTuple(),該方法會發(fā)射一個新的tuple到topology,如果沒有新tuple發(fā)射則會簡單的返回。
Bolt:
Topology中所有的處理都由Bolt完成。Bolt可以完成任何事,比如:連接的過濾、聚合、訪問文件/數(shù)據(jù)庫、等等。Bolt從Spout中接收數(shù)據(jù)并進行處理,如果遇到復(fù)雜流的處理也可能將tuple發(fā)送給另一個Bolt進行處理。而Bolt中最重要的方法是execute(),以新的tuple作為參數(shù)接收。不管是Spout還是Bolt,如果將tuple發(fā)射成多個流,這些流都可以通過declareStream()來聲明。
Stream Groupings:
Stream Grouping定義了一個流在Bolt任務(wù)間該如何被切分。這里有Storm提供的6個Stream Grouping類型:
1. 隨機分組(Shuffle grouping):隨機分發(fā)tuple到Bolt的任務(wù),保證每個任務(wù)獲得相等數(shù)量的tuple。
2. 字段分組(Fields grouping):根據(jù)指定字段分割數(shù)據(jù)流,并分組。例如,根據(jù)“user-id”字段,相同“user-id”的元組總是分發(fā)到同一個任務(wù),不同“user-id”的元組可能分發(fā)到不同的任務(wù)。
3. 全部分組(All grouping):tuple被復(fù)制到bolt的所有任務(wù)。這種類型需要謹慎使用。
4. 全局分組(Global grouping):全部流都分配到bolt的同一個任務(wù)。明確地說,是分配給ID最小的那個task。
5. 無分組(None grouping):你不需要關(guān)心流是如何分組。目前,無分組等效于隨機分組。但最終,Storm將把無分組的Bolts放到Bolts或Spouts訂閱它們的同一線程去執(zhí)行(如果可能)。
6. 直接分組(Direct grouping):這是一個特別的分組類型。元組生產(chǎn)者決定tuple由哪個元組處理者任務(wù)接收。
當(dāng)然還可以實現(xiàn)CustomStreamGroupimg接口來定制自己需要的分組。
項目實施
當(dāng)下情況我們需要給Spout和Bolt設(shè)計一種能夠處理大量數(shù)據(jù)(日志文件)的topology,當(dāng)一個特定數(shù)據(jù)值超過預(yù)設(shè)的臨界值時促發(fā)警報。使用Storm的topology,逐行讀入日志文件并且監(jiān)視輸入數(shù)據(jù)。在Storm組件方面,Spout負責(zé)讀入輸入數(shù)據(jù)。它不僅從現(xiàn)有的文件中讀入數(shù)據(jù),同時還監(jiān)視著新文件。文件一旦被修改Spout會讀入新的版本并且覆蓋之前的tuple(可以被Bolt讀入的格式),將tuple發(fā)射給Bolt進行臨界分析,這樣就可以發(fā)現(xiàn)所有可能超臨界的記錄。
下一節(jié)將對用例進行詳細介紹。
臨界分析
這一節(jié),將主要聚焦于臨界值的兩種分析類型:瞬間臨界(instant thershold)和時間序列臨界(time series threshold)。
- 瞬間臨界值監(jiān)測:一個字段的值在那個瞬間超過了預(yù)設(shè)的臨界值,如果條件符合的話則觸發(fā)一個trigger。舉個例子當(dāng)車輛超越80公里每小時,則觸發(fā)trigger。
- 時間序列臨界監(jiān)測:字段的值在一個給定的時間段內(nèi)超過了預(yù)設(shè)的臨界值,如果條件符合則觸發(fā)一個觸發(fā)器。比如:在5分鐘類,時速超過80KM兩次及以上的車輛。
Listing One顯示了我們將使用的一個類型日志,其中包含的車輛數(shù)據(jù)信息有:車牌號、車輛行駛的速度以及數(shù)據(jù)獲取的位置。
| AB 123 | 60 | North city |
| BC 123 | 70 | South city |
| CD 234 | 40 | South city |
| DE 123 | 40 | East ?city |
| EF 123 | 90 | South city |
| GH 123 | 50 | West? city |
這里將創(chuàng)建一個對應(yīng)的XML文件,這將包含引入數(shù)據(jù)的模式。這個XML將用于日志文件的解析。XML的設(shè)計模式和對應(yīng)的說明請見下表。
XML文件和日志文件都存放在Spout可以隨時監(jiān)測的目錄下,用以關(guān)注文件的實時更新。而這個用例中的topology請見下圖。
Figure 1:Storm中建立的topology,用以實現(xiàn)數(shù)據(jù)實時處理
如圖所示:FilelistenerSpout接收輸入日志并進行逐行的讀入,接著將數(shù)據(jù)發(fā)射給ThresoldCalculatorBolt進行更深一步的臨界值處理。一旦處理完成,被計算行的數(shù)據(jù)將發(fā)送給DBWriterBolt,然后由DBWriterBolt存入給數(shù)據(jù)庫。下面將對這個過程的實現(xiàn)進行詳細的解析。
Spout的實現(xiàn)
Spout以日志文件和XML描述文件作為接收對象。XML文件包含了與日志一致的設(shè)計模式。不妨設(shè)想一下一個示例日志文件,包含了車輛的車牌號、行駛速度、以及數(shù)據(jù)的捕獲位置。(看下圖)
Figure2:數(shù)據(jù)從日志文件到Spout的流程圖
Listing Two顯示了tuple對應(yīng)的XML,其中指定了字段、將日志文件切割成字段的定界符以及字段的類型。XML文件以及數(shù)據(jù)都被保存到Spout指定的路徑。
Listing Two:用以描述日志文件的XML文件。
通過構(gòu)造函數(shù)及它的參數(shù)Directory、PathSpout和TupleInfo對象創(chuàng)建Spout對象。TupleInfo儲存了日志文件的字段、定界符、字段的類型這些很必要的信息。這個對象通過XSTream序列化XML時建立。
Spout的實現(xiàn)步驟:
- 對文件的改變進行分開的監(jiān)聽,并監(jiān)視目錄下有無新日志文件添加。
- 在數(shù)據(jù)得到了字段的說明后,將其轉(zhuǎn)換成tuple。
- 聲明Spout和Bolt之間的分組,并決定tuple發(fā)送給Bolt的途徑。
Spout的具體編碼在Listing Three中顯示。
Listing Three:Spout中open、nextTuple和delcareOutputFields方法的邏輯。????
declareOutputFileds()決定了tuple發(fā)射的格式,這樣的話Bolt就可以用類似的方法將tuple譯碼。Spout持續(xù)對日志文件的數(shù)據(jù)的變更進行監(jiān)聽,一旦有添加Spout就會進行讀入并且發(fā)送給Bolt進行處理。
Bolt的實現(xiàn)
Spout的輸出結(jié)果將給予Bolt進行更深一步的處理。經(jīng)過對用例的思考,我們的topology中需要如Figure 3中的兩個Bolt。
Figure 3:Spout到Bolt的數(shù)據(jù)流程。
ThresholdCalculatorBolt
Spout將tuple發(fā)出,由ThresholdCalculatorBolt接收并進行臨界值處理。在這里,它將接收好幾項輸入進行檢查;分別是:
臨界值檢查
- 臨界值欄數(shù)檢查(拆分成字段的數(shù)目)
- 臨界值數(shù)據(jù)類型(拆分后字段的類型)
- 臨界值出現(xiàn)的頻數(shù)
- 臨界值時間段檢查
Listing Four中的類,定義用來保存這些值。
Listing Four:ThresholdInfo類
基于字段中提供的值,臨界值檢查將被Listing Five中的execute()方法執(zhí)行。代碼大部分的功能是解析和接收值的檢測。
Listing Five:臨界值檢測代碼段
public void execute(Tuple tuple, BasicOutputCollector collector) { if(tuple!=null) { List<Object> inputTupleList = (List<Object>) tuple.getValues(); int thresholdColNum = thresholdInfo.getThresholdColNumber(); Object thresholdValue = thresholdInfo.getThresholdValue(); String thresholdDataType = tupleInfo.getFieldList().get(thresholdColNum-1).getColumnType(); Integer timeWindow = thresholdInfo.getTimeWindow(); int frequency = thresholdInfo.getFrequencyOfOccurence(); if(thresholdDataType.equalsIgnoreCase("string")) { String valueToCheck = inputTupleList.get(thresholdColNum-1).toString(); String frequencyChkOp = thresholdInfo.getAction(); if(timeWindow!=null) { long curTime = System.currentTimeMillis(); long diffInMinutes = (curTime-startTime)/(1000); if(diffInMinutes>=timeWindow) { if(frequencyChkOp.equals("==")) { if(valueToCheck.equalsIgnoreCase(thresholdValue.toString())) { count.incrementAndGet(); if(count.get() > frequency) splitAndEmit(inputTupleList,collector); } } else if(frequencyChkOp.equals("!=")) { if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString())) { count.incrementAndGet(); if(count.get() > frequency) splitAndEmit(inputTupleList,collector); } } else System.out.println("Operator not supported"); } } else { if(frequencyChkOp.equals("==")) { if(valueToCheck.equalsIgnoreCase(thresholdValue.toString())) { count.incrementAndGet(); if(count.get() > frequency) splitAndEmit(inputTupleList,collector); } } else if(frequencyChkOp.equals("!=")) { if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString())) { count.incrementAndGet(); if(count.get() > frequency) splitAndEmit(inputTupleList,collector); } } } } else if(thresholdDataType.equalsIgnoreCase("int") || thresholdDataType.equalsIgnoreCase("double") || thresholdDataType.equalsIgnoreCase("float") || thresholdDataType.equalsIgnoreCase("long") || thresholdDataType.equalsIgnoreCase("short")) { String frequencyChkOp = thresholdInfo.getAction(); if(timeWindow!=null) { long valueToCheck = Long.parseLong(inputTupleList.get(thresholdColNum-1).toString()); long curTime = System.currentTimeMillis(); long diffInMinutes = (curTime-startTime)/(1000); System.out.println("Difference in minutes="+diffInMinutes); if(diffInMinutes>=timeWindow) { if(frequencyChkOp.equals("<")) { if(valueToCheck < Double.parseDouble(thresholdValue.toString())) { count.incrementAndGet(); if(count.get() > frequency) splitAndEmit(inputTupleList,collector); } } else if(frequencyChkOp.equals(">")) { if(valueToCheck > Double.parseDouble(thresholdValue.toString())) { count.incrementAndGet(); if(count.get() > frequency) splitAndEmit(inputTupleList,collector); } } else if(frequencyChkOp.equals("==")) { if(valueToCheck == Double.parseDouble(thresholdValue.toString())) { count.incrementAndGet(); if(count.get() > frequency) splitAndEmit(inputTupleList,collector); } } else if(frequencyChkOp.equals("!=")) { . . . } } } else splitAndEmit(null,collector); } else { System.err.println("Emitting null in bolt"); splitAndEmit(null,collector); } }經(jīng)由Bolt發(fā)送的的tuple將會傳遞到下一個對應(yīng)的Bolt,在我們的用例中是DBWriterBolt。
DBWriterBolt
經(jīng)過處理的tuple必須被持久化以便于觸發(fā)tigger或者更深層次的使用。DBWiterBolt做了這個持久化的工作并把tuple存入了數(shù)據(jù)庫。表的建立由prepare()函數(shù)完成,這也將是topology調(diào)用的第一個方法。方法的編碼如Listing Six所示。
Listing Six:建表編碼。
public void prepare( Map StormConf, TopologyContext context ) { try { Class.forName(dbClass); } catch (ClassNotFoundException e) { System.out.println("Driver not found"); e.printStackTrace(); } try { connection driverManager.getConnection( "jdbc:mysql://"+databaseIP+":"+databasePort+"/"+databaseName, userName, pwd); connection.prepareStatement("DROP TABLE IF EXISTS "+tableName).execute(); StringBuilder createQuery = new StringBuilder( "CREATE TABLE IF NOT EXISTS "+tableName+"("); for(Field fields : tupleInfo.getFieldList()) { if(fields.getColumnType().equalsIgnoreCase("String")) createQuery.append(fields.getColumnName()+" VARCHAR(500),"); else createQuery.append(fields.getColumnName()+" "+fields.getColumnType()+","); } createQuery.append("thresholdTimeStamp timestamp)"); connection.prepareStatement(createQuery.toString()).execute(); // Insert Query StringBuilder insertQuery = new StringBuilder("INSERT INTO "+tableName+"("); String tempCreateQuery = new String(); for(Field fields : tupleInfo.getFieldList()) { insertQuery.append(fields.getColumnName()+","); } insertQuery.append("thresholdTimeStamp").append(") values ("); for(Field fields : tupleInfo.getFieldList()) { insertQuery.append("?,"); } insertQuery.append("?)"); prepStatement = connection.prepareStatement(insertQuery.toString()); } catch (SQLException e) { e.printStackTrace(); } }數(shù)據(jù)分批次的插入數(shù)據(jù)庫。插入的邏輯由Listting Seven中的execute()方法提供。大部分的編碼都是用來實現(xiàn)可能存在不同類型輸入的解析。
Listing Seven:數(shù)據(jù)插入的代碼部分。
public void execute(Tuple tuple, BasicOutputCollector collector) { batchExecuted=false; if(tuple!=null) { List<Object> inputTupleList = (List<Object>) tuple.getValues(); int dbIndex=0; for(int i=0;i<tupleInfo.getFieldList().size();i++) { Field field = tupleInfo.getFieldList().get(i); try { dbIndex = i+1; if(field.getColumnType().equalsIgnoreCase("String")) prepStatement.setString(dbIndex, inputTupleList.get(i).toString()); else if(field.getColumnType().equalsIgnoreCase("int")) prepStatement.setInt(dbIndex, Integer.parseInt(inputTupleList.get(i).toString())); else if(field.getColumnType().equalsIgnoreCase("long")) prepStatement.setLong(dbIndex, Long.parseLong(inputTupleList.get(i).toString())); else if(field.getColumnType().equalsIgnoreCase("float")) prepStatement.setFloat(dbIndex, Float.parseFloat(inputTupleList.get(i).toString())); else if(field.getColumnType().equalsIgnoreCase("double")) prepStatement.setDouble(dbIndex, Double.parseDouble(inputTupleList.get(i).toString())); else if(field.getColumnType().equalsIgnoreCase("short")) prepStatement.setShort(dbIndex, Short.parseShort(inputTupleList.get(i).toString())); else if(field.getColumnType().equalsIgnoreCase("boolean")) prepStatement.setBoolean(dbIndex, Boolean.parseBoolean(inputTupleList.get(i).toString())); else if(field.getColumnType().equalsIgnoreCase("byte")) prepStatement.setByte(dbIndex, Byte.parseByte(inputTupleList.get(i).toString())); else if(field.getColumnType().equalsIgnoreCase("Date")) { Date dateToAdd=null; if (!(inputTupleList.get(i) instanceof Date)) { DateFormat df = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss"); try { dateToAdd = df.parse(inputTupleList.get(i).toString()); } catch (ParseException e) { System.err.println("Data type not valid"); } } else { dateToAdd = (Date)inputTupleList.get(i); java.sql.Date sqlDate = new java.sql.Date(dateToAdd.getTime()); prepStatement.setDate(dbIndex, sqlDate); } } catch (SQLException e) { e.printStackTrace(); } } Date now = new Date(); try { prepStatement.setTimestamp(dbIndex+1, new java.sql.Timestamp(now.getTime())); prepStatement.addBatch(); counter.incrementAndGet(); if (counter.get()== batchSize) executeBatch(); } catch (SQLException e1) { e1.printStackTrace(); } } else { long curTime = System.currentTimeMillis(); long diffInSeconds = (curTime-startTime)/(60*1000); if(counter.get()<batchSize && diffInSeconds>batchTimeWindowInSeconds) { try { executeBatch(); startTime = System.currentTimeMillis(); } catch (SQLException e) { e.printStackTrace(); } } } } public void executeBatch() throws SQLException { batchExecuted=true; prepStatement.executeBatch(); counter = new AtomicInteger(0); }一旦Spout和Bolt準(zhǔn)備就緒(等待被執(zhí)行),topology生成器將會建立topology并準(zhǔn)備執(zhí)行。下面就來看一下執(zhí)行步驟。
在本地集群上運行和測試topology
- 通過TopologyBuilder建立topology。
- 使用Storm Submitter,將topology遞交給集群。以topology的名字、配置和topology的對象作為參數(shù)。
- 提交topology。
Listing Eight:建立和執(zhí)行topology。
public class StormMain { public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException { ParallelFileSpout parallelFileSpout = new ParallelFileSpout(); ThresholdBolt thresholdBolt = new ThresholdBolt(); DBWriterBolt dbWriterBolt = new DBWriterBolt(); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", parallelFileSpout, 1); builder.setBolt("thresholdBolt", thresholdBolt,1).shuffleGrouping("spout"); builder.setBolt("dbWriterBolt",dbWriterBolt,1).shuffleGrouping("thresholdBolt"); if(this.argsMain!=null && this.argsMain.length > 0) { conf.setNumWorkers(1); StormSubmitter.submitTopology( this.argsMain[0], conf, builder.createTopology()); } else { Config conf = new Config(); conf.setDebug(true); conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology( "Threshold_Test", conf, builder.createTopology()); } } }topology被建立后將被提交到本地集群。一旦topology被提交,除非被取締或者集群關(guān)閉,它將一直保持運行不需要做任何的修改。這也是Storm的另一大特色之一。
這個簡單的例子體現(xiàn)了當(dāng)你掌握了topology、spout和bolt的概念,將可以輕松的使用Storm進行實時處理。如果你既想處理大數(shù)據(jù)又不想遍歷Hadoop的話,不難發(fā)現(xiàn)使用Storm將是個很好的選擇。
原文鏈接:Easy, Real-Time Big Data Analysis Using Storm?(編譯/仲浩 王旭東/審校)
總結(jié)
以上是生活随笔為你收集整理的使用Storm实现实时大数据分析!的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 同名游戏改编电影《无主之地》明年 8 月
- 下一篇: 英特尔确认搁置云端 GPU 服务 End