AKKA:大数据下的并发编程模型
在大數據如日中天的當今,開發中只會調用 API 是遠遠不夠的,火熱的 Spark、Flink 被越來越多的人掌握,這就驅使技術人員向技術中更深層次的知識去挖掘,今天我們就一起聊聊分布式計算和通信實現技術 AKKA,到底依靠哪些優勢被 Spark 和 Flink 所使用。
在本場 Chat 中,一萬多字中會講到如下內容:
- Akka 介紹、Actor 模型入門
- Actor 工作機制、消息傳遞、應用實例
- Akka 網絡編程:理論講解
- Akka 網絡編程:手敲代碼
- Spark 使用 Akka 實現進程通訊
適合人群: 對 Akka 有興趣及深入大數據技術的技術人員
1 Akka 介紹
Akka 是 JAVA 虛擬機 JVM 平臺上構建高并發、分布式和容錯應用的工具包和運行時,你可以理解成 Akka 是編寫并發程序的框架。
Akka 用 Scala 語言寫成,同時提供了 Scala 和 JAVA 的開發接口。
Akka 主要解決的問題是:可以輕松的寫出高效穩定的并發程序,程序員不再過多的考慮線程、鎖和資源競爭等細節。
2 Akka 中 Actor(角色) 模型
處理并發問題關鍵是要保證共享數據的一致性和正確性,因為程序是多線程時,多個線程對同一個數據進行修改,若不加同步條件,勢必會造成數據污染。但是當我們對關鍵代碼加入同步條件 synchronized 后,實際上大并發就會阻塞在這段代碼,對程序效率有很大影響。
若是用單線程處理,不會有數據一致性的問題,但是系統的性能又不能保證。
Actor 模型的出現解決了這個問題,簡化并發編程,提升程序性能。 你可以這里理解:Actor 模型是一種處理并發問題的解決方案,很牛!
3 Akka 中 Actor 模型
對上面的 Actor 模型做了說明和小結
Akka 處理并發的方法基于 Actor 模型(示意圖)
在基于 Actor 的系統里,所有的事物都是 Actor,就好像在面向對象設計里面所有的事物都是對象一樣。
Actor 模型是作為一個并發模型設計和架構的。Actor 與 Actor 之間只能通過消息通信[消息的發送必須通過 ActorRef 發送],如圖的信封
Actor 與 Actor 之間只能用消息進行通信,當一個 Actor 給另外一個 Actor 發消息,消息是有順序的(消息隊列),只需要將消息投寄的相應的郵箱即可
怎么處理消息是由接收消息的 Actor 決定的,發送消息 Actor 可以等待回復,也可以異步處理[ajax]
ActorSystem 的職責是負責創建并管理其創建的 Actor, ActorSystem 是單例的[工廠模式],一個 JVM 進程中有一個即可,而 Actor 是可以有多個的
Actor 模型是對并發模型進行了更高的抽象
Actor 模型是異步、非阻塞、高性能的事件驅動編程模型 [案例:說明什么是異步、非阻塞,最經典的案例就是 ajax 異步請求處理 ]
Actor 模型是輕量級事件處理(1GB 內存可容納百萬級別個 Actor),因此處理大并發性能高
4 Actor 模型工作機制說明
4.1 示意圖
4.2 Actor 模型工作機制說明(對照工作機制示意圖理解)
ActorySystem 創建 Actor
ActorRef:可以理解成是 Actor 的代理或者引用。消息是通過 ActorRef 來發送,而不能通過 Actor 發送消息,通過哪個 ActorRef 發消息,就表示把該消息發給哪個 Actor
消息發送到 Dispatcher Message (消息分發器),它得到消息后,會將消息進行分發到對應的 MailBox。(注: Dispatcher Message 可以理解成是一個線程池, MailBox 可以理解成是消息隊列,可以緩沖多個消息,遵守 FIFO)
Actor 可以通過 receive 方法來獲取消息,然后進行處理。
4.3 Actor 間傳遞消息機制(對照工作機制示意圖理解)
每一個消息就是一個 Message 對象。Message 繼承了 Runable, 因為 Message 就是線程類。
從 Actor 模型工作機制看上去很麻煩,但是程序員編程時只需要編寫 Actor 就可以了,其它的交給 Actor 模型完成即可。
A Actor 要給 B Actor 發送消息,那么 A Actor 要先拿到(也稱為持有) B Actor 的 代理對象 ActorRef 才能發送消息
5 Actor 模型快速入門
5.1 應用實例需求
編寫一個 Actor,比如 SayHelloActor
SayHelloActor 可以給自己發送消息,如圖
要求使用 Maven 的方式來構建項目,這樣可以很好的解決項目開發包的依賴關系。(AKKA 版本需要和 Scala 版本對應,使用 Maven 可以解決問題)
代碼實現和說明
5.2 對上面的代碼進行小結和說明
6 Actor 模型應用實例-- Actor 間通訊
6.1 應用實例需求
編寫 2 個 Actor ,分別是 AActor 和 BActor
AActor 和 BActor 之間可以相互發送消息,
加強對 Actor 傳遞消息機制的理解
6.2 兩個 Actor 的通訊機制原理圖和思路分析
6.3 代碼實現
//AActor.scalapackage com.test.akka.actorsimport akka.actor.{Actor, ActorRef}//AActor 先出招class AActor(iBActorRef:ActorRef) extends Actor{ val bActorRef = iBActorRef var count = 0 override def receive: Receive = { case "start" => { println("AActor 啟動") println("stark ok") println("我打") //發給 BActor bActorRef ! "我打" } case "我打" => { count += 1 println(s"AActor(黃飛鴻) 挺猛 看我佛山無影腳 第${count}腳") Thread.sleep(1000) bActorRef ! "我打" } }} //BActor.scalapackage com.test.akka.actorsimport akka.actor.Actorclass BActor extends Actor{ var count = 0 override def receive:Receive = { case "我打" => { count += 1 println(s"BActor(喬峰) 厲害 看我降龍十八掌 第${count}掌") Thread.sleep(1000) sender() ! "我打" } }} //ActorGame.scalapackage com.test.akka.actorsimport akka.actor.{ActorRef, ActorSystem, Props}object ActorGame extends App{ //1. ActorSystme val actorfactory = ActorSystem("actorfactory") val bActorRef: ActorRef = actorfactory.actorOf(Props[BActor],"BActor") val aActorRef: ActorRef = actorfactory.actorOf(Props(new AActor(bActorRef)), "AActor") //做一個要求:當 100 招,就退出.. aActorRef ! "start"}8 Akka 網絡編程基本介紹
Akka 支持面向大并發后端服務程序,網絡通信這塊是服務端程序重要的一部分。
8.1 網絡編程有兩種:
TCP socket 編程,是網絡編程的主流。之所以叫 Tcp socket 編程,是因為底層是基于 tcp/ip 協議 的. 比如: QQ 聊天
b/s 結構的 http 編程,我們使用瀏覽器去訪問服務器時,使用的就是 http 協議,而 http 底層依舊是用 tcp socket 實現的。 比如: 京東商城 【屬于 web 編程范疇,核心的協議是 http,底層是 tcp/ip 協議 (協議簇)】
8.2 OSI 與 tcp/ip 參考模型 (推薦 tcp/ip 協議 3 卷)
推薦一部書,《tcp/ip 協議》和《Unix 高級編程》
8.3 ip 地址
概述:每個 internet 上的主機和路由器都有一個 ip 地址,它包括網絡號和主機號,ip 地址有 ipv4(32 位) 或者 ipv6(128 位),可以通過 ipconfig 來查看。
8.4 端口 (port)--介紹
我們這里所指的端口不是指物理意義上的端口,而是特指 TCP/IP 協議中的端口,是邏輯意義上的端口。如果把 IP 地址比作一間房子,端口就是出入這間房子的門。真正的房子只有幾個門,但是一個 IP 地址的端口 可以有 65535(即:256×256-1)個之多!端口是通過端口號來標記的。(端口號 0:保留 Reserved)
8.5 端口(port)--分類
0 號是保留端口
1-1024 是固定端口 [有名端口/ 名花有主 ],即被某些程序固定使用,一般程序員不使用。
22: SSH 遠程登錄協議 23: telnet 使用 21: ftp 使用 25: smtp 服務使用 80: iis 使用 7: echo 服務
1025-65535 是動態端口 [純凈版,關閉不需要端口,sshd [改一個] ]
這些端口,程序員可以使用 netstat -anb
8.6 端口(port)-使用注意
在計算機(尤其是做服務器)要盡可能的少開端口
一個端口只能被一個程序監聽(80 但是一個端口可以連接多個客戶端)
如果使用 netstat –an 可以查看本機有哪些端口在監聽
可以使用 netstat –anb 來查看監聽端口的 pid,在結合任務管理器關閉不安全的端口
8.7 網絡拓撲
以下我們將 tcp socket 編程,簡稱 socket 編程。
下圖為 socket 編程中客戶端和服務器的網絡分布
9 Akka 網絡編程--小黃雞客服
9.1 需求分析
服務端進行監聽(9999)
客戶端可以通過鍵盤輸入,發送咨詢問題給小黃雞客服(服務端)
小黃雞(服務端) 回答客戶的問題
9.2 界面設計
9.3 程序的框架圖
9.4 代碼實現
MessageProtocol.scala
package com.test.akka.yellowchickenserver.common1. mes 會稱為 樣例類的只讀屬性//ClientMessage 客戶端發送給服務器的協議數據(對象)case class ClientMessage(mes:String)//ServerMessage 服務端會送給客戶端的協議數據(對象)case class ServerMessage(mes:String)YellowChickenServer.scala
package com.test.akka.yellowchickenserver.serverimport akka.actor.{Actor, ActorRef, ActorSystem, Props}import com.test.akka.yellowchickenserver.common.{ClientMessage, ServerMessage}import com.typesafe.config.ConfigFactoryclass YellowChickenServer extends Actor{ override def receive:Receive = { case "start" => { println("小黃 開始監聽程序,可以咨詢問題~~") } case ClientMessage(mes) => { //怎么匹配他的內容 println("客戶咨詢的問題是" + mes) mes match { case "大數據學費" => { sender() ! ServerMessage("20000RMB") } case "地址" => { sender() ! ServerMessage("北京昌平 XX 樓 111 號") } case "課程" => { sender() ! ServerMessage("JavaEE Python 前端 大數據") } case _ => { sender() ! ServerMessage("你說的啥子~~") } } } }}object YellowChickenServer extends App{ //創建 ActorSystem //因為這時,我們需要監聽網絡,所以使用如下方法創建工廠 //Config 就是我們的網絡配置 ip , port.. //def apply(name: String, config: Config): ActorSystem = apply(name, Option(config), None, None) val host = "127.0.0.1" //ip4 val port = 9999 //Config 就是我們的網絡配置 ip , port.. // val config = ConfigFactory.parseString( s""" |akka.actor.provider="akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname=$host |akka.remote.netty.tcp.port=$port """.stripMargin) val serverActorSystem = ActorSystem("Server",config) val yellowChickenServerRef: ActorRef = serverActorSystem.actorOf(Props[YellowChickenServer],"YellowChickenServer") //akka.tcp://Server@127.0.0.1:9999 就是 Actor 路徑 yellowChickenServerRef ! "start"}CustomerActor.scala
package com.test.akka.yellowchickenserver.clientimport akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}import com.test.akka.yellowchickenserver.common.{ClientMessage, ServerMessage}import com.typesafe.config.ConfigFactoryimport scala.io.StdInclass CustomerActor extends Actor { //我們這里需要持有 Server 的 Ref var yellowChickenServerRef: ActorSelection = _ //preStart , 在啟動 Actor 之前會先運行,因此變量,初始化寫入 preStart override def preStart(): Unit = { //println("preStart") //說明 //1. 在 AKKA 的 Actor 模型中, 認為 每個 Actor 都是一個資源(角色),通過一個 Path 來定位一個 actor //2. path 的組成 akka.tcp://Server 的 actorfactory 名字@ServerIp:Server 的 port/user/ServerActor 名字 yellowChickenServerRef = context.actorSelection("akka.tcp://Server@127.0.0.1:9999/user/YellowChickenServer") } override def receive: Receive = { case "start" => { println("客戶端啟動,可以咨詢問題~~") } case mes: String => { //將 mes 發送給 Server yellowChickenServerRef ! ClientMessage(mes) } case ServerMessage(mes) => { println("收到小黃雞客服回復的消息: " + mes) } }}object CustomerActor extends App { //編寫必要的配置信息 val serverHost = "127.0.0.1" val serverPort = 9999 val clientHost = "127.0.0.1" val clientPort = 10000 val config = ConfigFactory.parseString( s""" |akka.actor.provider="akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname=$clientHost |akka.remote.netty.tcp.port=$clientPort """.stripMargin) //創建 CustomerActor val clientActorSystem = ActorSystem("Client", config) val customerActorRef: ActorRef = clientActorSystem.actorOf(Props[CustomerActor], "CustomerActor") customerActorRef ! "start" println("可以咨詢問題了") while (true) { val mes = StdIn.readLine() customerActorRef ! mes //先發給自己,然后讓 CustomerActor 發 }}10 Spark Master Worker 進程通訊項目
10.1 項目意義
深入理解 Spark 的 Master 和 Worker 的通訊機制
為了方便同學們看 Spark 的底層源碼,命名的方式和源碼保持一致。(如:通訊消息類命名就是一樣的)
加深對主從服務心跳檢測機制(HeartBeat)的理解,方便以后 Spark 源碼二次開發。
10.2 項目需求分析
worker 注冊到 Master,Master 完成注冊,并回復 worker 注冊成功(注冊功能)
worker 定時發送心跳(3),并在 Master 接收到
Master 接收到 worker 心跳后,要更新該 worker 的最近一次發送心跳的時間
給 Master 啟動定時任務,定時檢測注冊的 worker 有哪些沒有更新心跳,并將其從 hashmap 中刪除
master worker 進行分布式部署(Linux 系統)->如何給 maven 項目打包->上傳 linux 并運行
10.3 實現功能 1-Worker 完成注冊
功能說明
worker 注冊到 Master,Master 完成注冊,并回復 worker 注冊成功
思路分析
代碼實現
10.4 實現功能 2-Worker 定時發送心跳
功能說明
worker 定時發送心跳給 Master,Master 能夠接收到,并更新 worker 上一次心跳時間
代碼實現
10.5 實現功能 3-Master 啟動定時任務,定時檢測注冊的 worker
功能說明
功能要求:Master 啟動定時任務(10秒),定時檢測注冊的 worker 有哪些沒有更新心跳,已經超時的 worker(6 秒),將其從 hashmap 中刪除掉
思路分析
代碼實現
10.6 實現功能 4-Master,Worker 的啟動參數運行時指定
功能說明
功能要求:Master,Worker 的啟動參數運行時指定,而不是固定寫在程序中的
代碼實現
10.7 對開發的 SparkMaster 和 SparkWorker 打包.jar , 部署到不同的 Linux 服務器,并運行
我這里直接在 windows 演示,同學們可以上傳到自己的 3 臺 linux 并并行
打包的步驟
修改 pom.xml 指定主類
com.test.akka.sparkmasterworker.master.SparkMaster
出 maven 的打包的界面
找到 lifecycle,雙擊 package 即可
打包后,到 target 去找 jar 即可
給 SparkWorker 打包的流程和前面完全一樣,但是需要先 clean
測試和指令java -jar SparkWorker.java 127.0.0.1 10000 SparkMaster01 127.0.0.1 10001 SparkWorker01
閱讀全文: http://gitbook.cn/gitchat/activity/5de88c119a74cc327f167a48
您還可以下載 CSDN 旗下精品原創內容社區 GitChat App , GitChat 專享技術內容哦。
總結
以上是生活随笔為你收集整理的AKKA:大数据下的并发编程模型的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 单元测试中测试用例的设计方法
- 下一篇: 数据分析实战项目练习——餐厅订单数据