Scala消息通信之akka,akka案例
#Scala編程實戰
##一、 課程目標
###1. 目標:熟練使用Scala編寫程序
##二、 項目概述
###1. 需求
目前大多數的分布式架構底層通信都是通過RPC實現的,RPC框架非常多,比如前我們學過的Hadoop項目的RPC通信框架,但是Hadoop在設計之初就是為了運行長達數小時的批量而設計的,在某些極端的情況下,任務提交的延遲很高,所有Hadoop的RPC顯得有些笨重。
Spark 的RPC是通過Akka類庫實現的,Akka用Scala語言開發,基于Actor并發模型實現,Akka具有高可靠、高性能、可擴展等特點,使用Akka可以輕松實現分布式RPC功能。
###2. Akka簡介
Akka基于Actor模型,提供了一個用于構建可擴展的(Scalable)、彈性的(Resilient)、快速響應的(Responsive)應用程序的平臺。
**Actor模型:**在計算機科學領域,Actor模型是一個并行計算(Concurrent Computation)模型,它把actor作為并行計算的基本元素來對待:為響應一個接收到的消息,一個actor能夠自己做出一些決策,如創建更多的actor,或發送更多的消息,或者確定如何去響應接收到的下一個消息。
Actor是Akka中最核心的概念,它是一個封裝了狀態和行為的對象,Actor之間可以通過交換消息的方式進行通信,每個Actor都有自己的收件箱(Mailbox)。通過Actor能夠簡化鎖及線程管理,可以非常容易地開發出正確地并發程序和并行系統,Actor具有如下特性:
**1.**提供了一種高級抽象,能夠簡化在并發(Concurrency)/并行(Parallelism)應用場景下的編程開發
**2.**提供了異步非阻塞的、高性能的事件驅動編程模型
**3.**超級輕量級事件處理(每GB堆內存幾百萬Actor)
##案例介紹:
知識點說明:
1.Akka可以實現不同進程之間的通信
2.老大叫ActorSystem,用于創建和監控Acotr
3.真正用于通信的是Acotr
4.一個進程里面可以有多個Acotor
5.可以有多個進程
6.如果不同ActorSystem下面的Acotor要進行通信,首先AcotorSystem之間要建立連接
在akka的程序中,需要服務端和客戶端。
創建一個maven項目,其中maven項目的pom.xml內容如下:
其中服務端的代碼如下:
package cn.toto.akkaimport akka.actor.{Actor, ActorSystem, Props} import com.typesafe.config.ConfigFactory/*** Created by toto on 2017/7/2.*/ class Master extends Actor {override def receive: Receive = {//里面要有一個偏函數case "start" => {println("starting...")println("started....")}case "stop" => {println("stoping...")println("stopted...");}//master接收到worker的消息case "connect" => {println("a client connected...")//這里表示向客戶端發送一個消息sender ! "success"}case _ => println("123")} }object Master {def main(args: Array[String]): Unit = {//通過s,可以將變量通過$取到val host = "127.0.0.1"val port = "8888"val confStr =s"""|akka.actor.provider = "akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname = "$host"|akka.remote.netty.tcp.port = "$port"""".stripMargin//讀取一些默認的配置文件val conf = ConfigFactory.parseString(confStr)//ActorSystem單例的,用于創建Actor并監控actorval actorSystem = ActorSystem("MasterActorSystem",conf)//通過ActorSystem創建Actor,可以通過Ctrl + P的方式看到這個方法里面可以有哪些參數//通過Master類型,反射創建實例var master = actorSystem.actorOf(Props[Master],"Master")//接著可以發消息了。后續這里可以不是發字符串,可以發case classmaster ! "start"master ! "hello"master ! "stop"actorSystem.awaitTermination()} }另外,worker端的代碼如下:
package cn.toto.akkaimport akka.actor.{Actor, ActorSystem, Props} import com.typesafe.config.ConfigFactory/*** Created by toto on 2017/7/2.*/ class Worker extends Actor{//actor里面有聲明周期方法//preStart在構造器之后receive之前執行override def preStart(): Unit = {//首先跟Master建立連接,其中"akka.tcp://MasterActorSystem@127.0.0.1:8888"可以在Master運行之后的控制臺中找到//下面的地址:"akka.tcp://MasterActorSystem@127.0.0.1:8888"表示要先連接actorSystem,接著要和它下面的/user/Master建立通信//這樣就相當于拿到了master的代理對象val master = context.actorSelection("akka.tcp://MasterActorSystem@127.0.0.1:8888/user/Master")//通過mater的引用向Master發送消息//向master發送connect消息master ! "connect"}override def receive: Receive = {case "success" => {println("a msg form master:success")}} }object Worker {def main(args: Array[String]): Unit = {//通過s,可以將變量通過$取到//val host = args(0)//val port = args(1).toIntval host = "127.0.0.1"val port = "9999"val confStr =s"""|akka.actor.provider = "akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname = "$host"|akka.remote.netty.tcp.port = "$port"""".stripMarginval conf = ConfigFactory.parseString(confStr)//單例的ActorSystemval actorSystem = ActorSystem("WorkerActorSystem",conf)//通過actorSystem來創建actorval worker = actorSystem.actorOf(Props[Worker],"Worker")//這里是等待優雅退出actorSystem.awaitTermination()} }運行過程:
1、先啟動Master(右鍵run),運行后的效果如下:
2、接著運行worker(右鍵run),運行后的效果如下:
總結
以上是生活随笔為你收集整理的Scala消息通信之akka,akka案例的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 君越2022款配几个气囊?
- 下一篇: 江淮和悦RS压缩机怎么拿出来?