使用Apache Storm和Apache Ignite进行复杂的事件处理(CEP)
在本文中, “使用Apache Ignite進行高性能內(nèi)存計算”一書的作者將討論使用Apache Strom和Apache Ignite進行復(fù)雜的事件處理。 本文的一部分摘自
書 。
術(shù)語“復(fù)雜事件處理”或CEP沒有廣泛或高度接受的定義。 Wikipedia的以下引用可以簡要描述什么是復(fù)雜事件處理:
“復(fù)雜事件處理(CEP)主要是一個事件處理概念,用于處理多個事件,目的是識別事件云中有意義的事件。 CEP采用的技術(shù)包括檢測許多事件的復(fù)雜模式,事件相關(guān)性和抽象性,事件層次結(jié)構(gòu)以及事件之間的因果關(guān)系,成員資格和時間關(guān)系以及事件驅(qū)動過程。
為簡單起見,復(fù)雜事件處理(CEP)是一種用于在真實世界中永不停止或流式傳輸事件數(shù)據(jù)的低延遲過濾,聚合和計算的技術(shù)。 在IT環(huán)境中,原始基礎(chǔ)結(jié)構(gòu)和業(yè)務(wù)事件的數(shù)量和速度都呈指數(shù)增長。 此外,移動設(shè)備的爆炸式增長和高速連接的普遍性加劇了移動數(shù)據(jù)的爆炸式增長。 同時,對業(yè)務(wù)流程敏捷性和執(zhí)行力的需求僅在增長。 這兩個趨勢給組織施加了壓力,要求它們提高其能力以支持事件驅(qū)動的實施架構(gòu)模式。 實時事件處理需要基礎(chǔ)架構(gòu)和應(yīng)用程序開發(fā)環(huán)境來執(zhí)行事件處理要求。 這些要求通常包括從日常使用案例擴展到極高的速度或各種數(shù)據(jù)和事件吞吐量的需求,潛在的延遲時間以微秒為單位,而不是響應(yīng)時間的秒數(shù)。
Apache Ignite允許在內(nèi)存中以可伸縮和容錯的方式處理連續(xù)不斷的數(shù)據(jù)流,而不是在數(shù)據(jù)到達數(shù)據(jù)庫后對其進行分析。 這不僅使您能夠關(guān)聯(lián)關(guān)系并從大量數(shù)據(jù)中檢測有意義的模式,還可以更快,更高效地完成此操作。 事件歷史記錄可以在內(nèi)存中保留任何時間長度(對于長時間運行的事件序列至關(guān)重要),也可以作為事務(wù)記錄在已存儲的數(shù)據(jù)庫中。
Apache Ignite CEP可以在眾多行業(yè)中使用,以下是一些一流的用例:
在其他一些工業(yè)或功能領(lǐng)域,您可以使用Apache Ignite處理流事件數(shù)據(jù),例如保險,運輸和公共部門。 復(fù)雜事件處理或CEP包含其過程的三個主要部分:
如上圖所示,數(shù)據(jù)是從不同來源獲取的。 源可以是任何傳感器(IoT),Web應(yīng)用程序或行業(yè)應(yīng)用程序。 可以直接在Ignite群集上以收集方式并發(fā)處理流數(shù)據(jù)。 此外,可以從其他來源豐富數(shù)據(jù)或過濾掉數(shù)據(jù)。 在計算數(shù)據(jù)之后,可以將計算或匯總的數(shù)據(jù)導(dǎo)出到其他系統(tǒng)以進行可視化或采取措施。
Apache Ignite Storm Streamer模塊提供了通過Storm到Ignite緩存的流傳輸。 在開始使用Ignite流媒體之前,讓我們看一下Apache Storm,以獲取有關(guān)Apache Storm的一些基礎(chǔ)知識。
Apache Storm是一個分布式容錯實時計算系統(tǒng)。 在短時間內(nèi),Apache Storm成為分布式實時處理系統(tǒng)的標(biāo)準,該系統(tǒng)使您可以處理大量數(shù)據(jù)。 Apache Storm項目是開源的,用Java和Clojure編寫。 它成為實時分析的首選。 Apache Ignite Storm流媒體模塊提供了一種方便的方法,可通過Storm將數(shù)據(jù)流傳輸?shù)絀gnite緩存。
關(guān)鍵概念:
Apache Storm從一端讀取??原始數(shù)據(jù)流,并將其通過一系列小型處理單元,然后在另一端輸出處理后的信息。 讓我們詳細了解Apache Storm的主要組件–
元組 –它是Storm的主要數(shù)據(jù)結(jié)構(gòu)。 這是元素的有序列表。 通常,元組支持所有基本數(shù)據(jù)類型。
流 –這是一個無約束且無序的元組序列。
嘴 -流的來源,簡單來說,壺嘴從拓撲中的源讀取數(shù)據(jù)。 壺嘴可以是可靠的或不可靠的。 噴口可以與隊列,Web日志,事件數(shù)據(jù)等對話。
螺栓 –螺栓是邏輯處理單元,它負責(zé)處理數(shù)據(jù)和創(chuàng)建新的流。 螺栓可以執(zhí)行過濾,聚合,聯(lián)接,與文件/數(shù)據(jù)庫交互等操作。 螺栓從噴嘴接收數(shù)據(jù),然后發(fā)射到一個或多個螺栓。
拓撲 -拓撲是噴口和螺栓的有向圖,該圖的每個節(jié)點都包含數(shù)據(jù)處理邏輯(螺栓),而連接邊定義數(shù)據(jù)(流)的流。
與Hadoop不同,Storm可使拓撲永久運行直到您將其殺死。 一個簡單的拓撲結(jié)構(gòu)從噴口開始,從源頭發(fā)射流到螺栓以處理數(shù)據(jù)。 Apache Storm的主要工作是運行拓撲,并將在給定的時間運行任意數(shù)量的拓撲。
開箱即用的Ignite提供了Storm Bolt(StormStreamer)的實現(xiàn),以將計算的數(shù)據(jù)流式傳輸?shù)絀gnite緩存中。 另一方面,您可以記下自定義的Strom Bolt,以將流數(shù)據(jù)提取到Ignite中。 要開發(fā)自定義的Storm Bolt,只需實現(xiàn)* BaseBasicBolt *或* IRichBolt * Storm接口。 但是,如果決定使用StormStreamer,則必須配置一些屬性才能正確運行Ignite Bolt。 所有必填屬性如下所示:
| 1個 | 快取名稱 | 將在其中存儲數(shù)據(jù)的Ignite緩存的緩存名稱。 |
| 2 | IgniteTupleField | 命名“點燃元組”字段,通過它在拓撲中獲取元組數(shù)據(jù)。 默認情況下,該值為ignite。 |
| 3 | IgniteConfigFile | 此屬性將設(shè)置Ignite彈簧配置 文件。 允許您向和發(fā)送消息和使用消息 從點燃主題。 |
| 4 | 允許覆蓋 | 它將啟用覆蓋緩存中的現(xiàn)有值,默認值為false。 |
| 5 | 自動刷新頻率 | 自動刷新頻率(以毫秒為單位)。 從本質(zhì)上講,這是拖纜將在 嘗試將到目前為止添加的所有數(shù)據(jù)提交到遠程 節(jié)點。 默認值為10秒。 |
掌握了基礎(chǔ)知識之后,我們來構(gòu)建一些有用的工具來檢查Ignite StormStreamer的工作方式。 該應(yīng)用程序的基本思想是設(shè)計噴嘴和螺栓的一種拓撲,該拓撲可以處理來自交通日志文件的大量數(shù)據(jù),并在特定值超過預(yù)定義閾值時觸發(fā)警報。 使用拓撲,可以逐行讀取日志文件,并且該拓撲旨在監(jiān)視傳入的數(shù)據(jù)。 在我們的例子中,日志文件將包含數(shù)據(jù),例如,車輛注冊號,速度和來自高速公路交通攝像頭的高速公路名稱。 如果車輛超過了速度限制(例如120km / h),Storm拓撲會將數(shù)據(jù)發(fā)送到Ignite緩存。
接下來的清單將顯示我們將在示例中使用的CSV文件類型,其中包含車輛數(shù)據(jù)信息,例如車輛注冊號,車輛行駛的速度和高速公路的位置。
AB 123, 160, North city BC 123, 170, South city CD 234, 40, South city DE 123, 40, East city EF 123, 190, South city GH 123, 150, West city XY 123, 110, North city GF 123, 100, South city PO 234, 140, South city XX 123, 110, East city YY 123, 120, South city ZQ 123, 100, West city 以上示例的思想取自Dobbs博士的期刊。 由于這本書不是為了研究Apache Storm,所以我將使示例盡可能簡單。 另外,我還添加了著名的Storm單詞計數(shù)示例,該示例通過StormStreamer模塊將單詞計數(shù)值提取到Ignite緩存中。 如果您對代碼感到好奇,請訪問以下網(wǎng)址
Chapter-cep / storm 。 上面的CSV文件將成為Storm拓撲的來源。
如上圖所示, FileSourceSpout接受輸入的CSV日志文件,逐行讀取數(shù)據(jù),并將數(shù)據(jù)發(fā)送到SpeedLimitBolt以進行進一步的閾值處理。 處理完成后,如果發(fā)現(xiàn)有任何汽車超過了速度限制,則數(shù)據(jù)將發(fā)送到Ignite StormStreamer螺栓,然后在此處將其提取到緩存中。 讓我們深入了解Storm拓撲。
步驟1:
因為這是一個Storm拓撲,所以必須在maven項目中添加Storm和Ignite StormStreamer依賴項。
<dependency><groupId>org.apache.ignite</groupId><artifactId>ignite-storm</artifactId><version>1.6.0</version> </dependency> <dependency><groupId>org.apache.ignite</groupId><artifactId>ignite-core</artifactId><version>1.6.0</version> </dependency> <dependency><groupId>org.apache.ignite</groupId><artifactId>ignite-spring</artifactId><version>1.6.0</version> </dependency> <dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>0.10.0</version><exclusions><exclusion><groupId>log4j</groupId><artifactId>log4j</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion><exclusion><groupId>commons-logging</groupId><artifactId>commons-logging</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>log4j-over-slf4j</artifactId></exclusion><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions> </dependency>在撰寫本書時,僅支持Apache Storm版本0.10.0。 請注意,您不需要任何Kafka模塊即可運行或執(zhí)行Ignite文檔中所述的示例。
第2步:
創(chuàng)建的Ignite配置文件(見例如,ignite.xml文件/chapter-cep/storm/src/resources/example-ignite.xml ),并確保它是可以從類路徑。 Ignite配置的內(nèi)容與本章的上一部分相同。
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:util="http://www.springframework.org/schema/util"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/utilhttp://www.springframework.org/schema/util/spring-util.xsd"><bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"><!-- Enable client mode. --><property name="clientMode" value="true"/><!-- Cache accessed from IgniteSink. --><property name="cacheConfiguration"><list><!-- Partitioned cache example configuration with configurations adjusted to server nodes'. --><bean class="org.apache.ignite.configuration.CacheConfiguration"><property name="atomicityMode" value="ATOMIC"/><property name="name" value="testCache"/></bean></list></property><!-- Enable cache events. --><property name="includeEventTypes"><list><!-- Cache events (only EVT_CACHE_OBJECT_PUT for tests). --><util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/></list></property><!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. --><property name="discoverySpi"><bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"><property name="ipFinder"><bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"><property name="addresses"><list><value>127.0.0.1:47500</value></list></property></bean></property></bean></property></bean> </beans>第三步:
創(chuàng)建一個ignite-storm.properties文件,以添加緩存名稱,元組名稱和Ignite配置的名稱,如下所示。
cache.name=testCache tuple.name=ignite ignite.spring.xml=example-ignite.xml第四步:
接下來,創(chuàng)建FileSourceSpout Java類,如下所示,
public class FileSourceSpout extends BaseRichSpout {private static final Logger LOGGER = LogManager.getLogger(FileSourceSpout.class);private SpoutOutputCollector outputCollector;@Overridepublic void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {this.outputCollector = spoutOutputCollector;} @Overridepublic void nextTuple() {try {Path filePath = Paths.get(this.getClass().getClassLoader().getResource("source.csv").toURI());try(Stream<String> lines = Files.lines(filePath)){lines.forEach(line ->{outputCollector.emit(new Values(line));});} catch(IOException e){LOGGER.error(e.getMessage());}} catch (URISyntaxException e) {LOGGER.error(e.getMessage());}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("trafficLog"));} }FileSourceSpout代碼具有三種重要方法
- open():此方法將在spout的開頭被調(diào)用,并為您提供上下文信息。
- nextTuple():此方法允許您一次將一個元組傳遞給Storm拓撲進行處理,在此方法中,我逐行讀取CSV文件,并將該行作為元組發(fā)出給螺栓。
- defineOutputFields():此方法聲明輸出元組的名稱,在本例中,名稱應(yīng)為trafficLog。
步驟5:
現(xiàn)在創(chuàng)建實現(xiàn)BaseBasicBolt接口的SpeedLimitBolt.java類。
public class SpeedLimitBolt extends BaseBasicBolt {private static final String IGNITE_FIELD = "ignite";private static final int SPEED_THRESHOLD = 120;private static final Logger LOGGER = LogManager.getLogger(SpeedLimitBolt.class);@Overridepublic void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {String line = (String)tuple.getValue(0);if(!line.isEmpty()){String[] elements = line.split(",");// we are interested in speed and the car registration numberint speed = Integer.valueOf((elements[1]).trim());String car = elements[0];if(speed > SPEED_THRESHOLD){TreeMap<String, Integer> carValue = new TreeMap<String, Integer>();carValue.put(car, speed);basicOutputCollector.emit(new Values(carValue));LOGGER.info("Speed violation found:"+ car + " speed:" + speed);}}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields(IGNITE_FIELD));} }讓我們再次逐行進行。
- execute():這是實現(xiàn)螺栓的業(yè)務(wù)邏輯的方法,在這種情況下,我用逗號分隔行并檢查汽車的速度限制。 如果給定汽車的速度限制高于閾值,則我們將從該元組創(chuàng)建新的樹圖數(shù)據(jù)類型,并將該元組發(fā)送到下一個螺栓,在本例中,下一個螺栓將是StormStreamer。
- defineOutputFields():此方法類似于FileSourceSpout中的clarifyOutputFields()方法,它聲明將返回Ignite元組以進行進一步處理。
請注意,元組名稱IGNITE在這里很重要, StormStreamer將僅處理名稱為Ignite的元組。
步驟6:
現(xiàn)在是創(chuàng)建拓撲以運行示例的時候了。 拓撲將噴口和螺栓綁在一張圖中,該圖定義了數(shù)據(jù)如何在組件之間流動。 它還提供了Storm在集群中創(chuàng)建組件實例時使用的并行提示。 要實現(xiàn)拓撲,請在src \ main \ java \ com \ blu \ imdg \ storm \ topology目錄中創(chuàng)建一個名為SpeedViolationTopology.java的新文件。 使用以下內(nèi)容作為文件的內(nèi)容:
public class SpeedViolationTopology {private static final int STORM_EXECUTORS = 2;public static void main(String[] args) throws Exception {if (getProperties() == null || getProperties().isEmpty()) {System.out.println("Property file <ignite-storm.property> is not found or empty");return;}// Ignite Stream Iboltfinal StormStreamer<String, String> stormStreamer = new StormStreamer<>();stormStreamer.setAutoFlushFrequency(10L);stormStreamer.setAllowOverwrite(true);stormStreamer.setCacheName(getProperties().getProperty("cache.name"));stormStreamer.setIgniteTupleField(getProperties().getProperty("tuple.name"));stormStreamer.setIgniteConfigFile(getProperties().getProperty("ignite.spring.xml"));TopologyBuilder builder = new TopologyBuilder();builder.setSpout("spout", new FileSourceSpout(), 1);builder.setBolt("limit", new SpeedLimitBolt(), 1).fieldsGrouping("spout", new Fields("trafficLog"));// set ignite boltbuilder.setBolt("ignite-bolt", stormStreamer, STORM_EXECUTORS).shuffleGrouping("limit");Config conf = new Config();conf.setDebug(false);conf.setMaxTaskParallelism(1);LocalCluster cluster = new LocalCluster();cluster.submitTopology("speed-violation", conf, builder.createTopology());Thread.sleep(10000);cluster.shutdown();}private static Properties getProperties() {Properties properties = new Properties();InputStream ins = SpeedViolationTopology.class.getClassLoader().getResourceAsStream("ignite-storm.properties");try {properties.load(ins);} catch (IOException e) {e.printStackTrace();properties = null;}return properties;} }讓我們再次逐行進行。 首先,我們閱讀ignite-strom.properties文件以獲取所有必要的參數(shù),然后再配置StormStreamer螺栓。 風(fēng)暴拓撲基本上是一個Thrift結(jié)構(gòu)。 TopologyBuilder類提供了一種簡單而優(yōu)雅的方法來構(gòu)建復(fù)雜的Storm拓撲。 TopologyBuilder類具有setSpout和setBolt方法。 接下來,我們使用“拓撲”構(gòu)建器構(gòu)建Storm拓撲,并添加了帶有名稱spout和并行度提示為1執(zhí)行程序的spout。
我們還將SpeedLimitBolt定義為具有1個執(zhí)行程序的并行提示的拓撲。 接下來,我們使用shufflegrouping設(shè)置StormStreamer螺栓,該shufflegrouping訂閱該螺栓,并在StormStreamer螺栓的各個實例之間平均分配元組(限制)。
出于開發(fā)目的,我們使用LocalCluster實例創(chuàng)建本地集群,并使用SubmitTopology方法提交拓撲。 將拓撲提交到集群后,我們將等待10秒鐘,等待集群計算提交的拓撲,然后使用LocalCluster的 shutdown方法關(guān)閉集群。
步驟7:
接下來,首先運行Apache Ignite或集群的本地節(jié)點。 構(gòu)建maven項目后,使用以下命令在本地運行拓撲。
mvn compile exec:java -Dstorm.topology=com.blu.imdg.storm.topology.SpeedViolationTopology該應(yīng)用程序?qū)a(chǎn)生很多系統(tǒng)日志,如下所示。
現(xiàn)在,如果我們通過ignitevisior驗證了Ignite緩存,我們應(yīng)該將以下輸出輸入到控制臺中。
輸出顯示結(jié)果,這是我們期望的。 從我們的source.csv日志文件中,只有五輛車超過了120 km / h的速度限制。
這幾乎是對Ignite Storm Streamer的實用概述的總結(jié)。 如果您對Ignite Camel或Ignite Flume Streamer感到好奇,請參閱“使用Apache Ignite進行高性能內(nèi)存計算”一書 。 您也可以與作者聯(lián)系以獲取該書的免費副本,該書可以免費分發(fā)給學(xué)生和教師。
翻譯自: https://www.javacodegeeks.com/2016/10/complex-event-processing-cep-apache-storm-apache-ignite.html
總結(jié)
以上是生活随笔為你收集整理的使用Apache Storm和Apache Ignite进行复杂的事件处理(CEP)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 电脑开机密码忘记了如何破解如何破译电脑开
- 下一篇: 怎么查自家wifi有多少人在用如何查询家