Akka并发编程——第六节:Actor模型(五)
本章主要內(nèi)容:
1. !消息發(fā)送,Fire-and-Forget消息模型
2. ?消息發(fā)送,Send-And-Receive-Future消息模型
Akka提供了兩種消息模型:fire-and-forget和Send-And-Receive-Future。fire-and-forget是一種單向消息發(fā)送模型,指的是異步發(fā)送消息,通過異步發(fā)送消息且消息發(fā)送后可以立即返回,Akka中使用?方法進(jìn)行fire-and-forget消息發(fā)送,如stringActor!”Creating Actors with implicit val context”,它的意思是當(dāng)前發(fā)送方Aactor向stringActor發(fā)送字符串消息”Creating Actors with implicit val context”,發(fā)送完該消息后立即返回,而無需等待stringActor的返回,!還有個(gè)重載的方法tell;Send-And-Receive-Future指的是異步發(fā)送消息則是一種雙向消息發(fā)送模型,向目標(biāo)Actor發(fā)送完消息后,然后返回一個(gè)Future作為后期可能的返回,當(dāng)前發(fā)送方Actor將等待目標(biāo)Actor的返回,Akka中使用?方法進(jìn)行Send-And-Receive-Future消息的發(fā)送,它也同樣有一個(gè)重載的方法ask
1. !消息發(fā)送,Fire-and-Forget消息模型
/*** 消息處理:!(Fire-Forget)*/ object Example12 extends App{import akka.actor.Actorimport akka.actor.Propsimport akka.event.Loggingimport akka.actor.ActorSystem//定義幾種不同的消息case class Start(var msg:String)case class Run(var msg:String)case class Stop(var msg:String)class ExampleActor extends Actor {val other = context.actorOf(Props[OtherActor], "OtherActor")val log = Logging(context.system, this)def receive={//使用fire-and-forget消息模型向OtherActor發(fā)送消息,隱式地傳遞sendercase Start(msg) => other ! msg//使用fire-and-forget消息模型向OtherActor發(fā)送消息,直接調(diào)用tell方法,顯式指定sendercase Run(msg) => other.tell(msg, sender)}}class OtherActor extends Actor{val log = Logging(context.system, this)def receive ={case s:String=>log.info("received message:\n"+s)case _ ? log.info("received unknown message")}}//創(chuàng)建ActorSystem,ActorSystem為創(chuàng)建和查找Actor的入口//ActorSystem管理的Actor共享配置信息如分發(fā)器(dispatchers)、部署(deployments)等val system = ActorSystem("MessageProcessingSystem")//創(chuàng)建ContextActorval exampleActor = system.actorOf(Props[ExampleActor],name="ExampleActor")//使用fire-and-forget消息模型向exampleActor發(fā)送消息exampleActor!Run("Running")exampleActor!Start("Starting")//關(guān)閉ActorSystemsystem.shutdown() }代碼運(yùn)行結(jié)果如下:
[INFO] [03/20/2016 20:57:43.665] [MessageProcessingSystem-akka.actor.default-dispatcher-5] [akka://MessageProcessingSystem/user/ExampleActor/OtherActor] received message: Running [INFO] [03/20/2016 20:57:43.672] [MessageProcessingSystem-akka.actor.default-dispatcher-5] [akka://MessageProcessingSystem/user/ExampleActor/OtherActor] received message: Starting在ExampleActor中,通過隱式變量context創(chuàng)建了OtherActor實(shí)例:val other = context.actorOf(Props[OtherActor], “OtherActor”),在ExampleActor的receive方法中,處理兩種不同類型的消息例如:
//使用fire-and-forget消息模型向OtherActor發(fā)送消息,隱式地傳遞sendercase Start(msg) => other ! msg//使用fire-and-forget消息模型向OtherActor發(fā)送消息,直接調(diào)用tell方法,顯式指定sendercase Run(msg) => other.tell(msg, sender)- 1
處理Start類型的消息時(shí),直接使用!進(jìn)行消息發(fā)送,而處理Run類型的消息時(shí),使用的是tell方法,可以看到使用tell方法需要顯式地指定其sender,而使用!進(jìn)行消息發(fā)送則不需要,事實(shí)上!方法通過隱式值傳入需要的Sender,對比!與tell方法的定義便很容易理解
//!方法的定義 def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit //tell方法的定義 final def tell(msg: Any, sender: ActorRef): Unit = this.!(msg)(sender)可以看到,tell方法的實(shí)現(xiàn)依賴于!方法。如果在一個(gè)Actor當(dāng)中使用!方法時(shí),例如ExampleActor中使用的other ! msg向OtherActor發(fā)送消息,則sender隱式為ExampleActor,如果不是在Actor中使用則默認(rèn)為Actor.noSender,即sender為null。
2. ?消息發(fā)送,Send-And-Receive-Future消息模型
理解了Fire-And-Forget消息模型后,接著對Send-And-Receive-Future消息模型進(jìn)行介紹,下面的代碼給出了其使用示例。
/*** 消息處理:?(Send-And-Receive-Future)*/ object Example13 extends App{import akka.actor.Actorimport akka.actor.Propsimport akka.event.Loggingimport akka.actor.ActorSystemimport scala.concurrent.Futureimport akka.pattern.askimport akka.util.Timeoutimport scala.concurrent.duration._import akka.pattern.pipeimport scala.concurrent.ExecutionContext.Implicits.global//消息:個(gè)人基礎(chǔ)信息case class BasicInfo(id:Int,val name:String, age:Int)//消息:個(gè)人興趣信息case class InterestInfo(id:Int,val interest:String)//消息: 完整個(gè)人信息case class Person(basicInfo: BasicInfo,interestInfo: InterestInfo)//基礎(chǔ)信息對應(yīng)Actorclass BasicInfoActor extends Actor{val log = Logging(context.system, this)def receive = {//處理送而來的用戶ID,然后將結(jié)果發(fā)送給sender(本例中對應(yīng)CombineActor)case id:Int ?log.info("id="+id);sender!new BasicInfo(id,"John",19)case _ ? log.info("received unknown message")}}//興趣愛好對應(yīng)Actorclass InterestInfoActor extends Actor{val log = Logging(context.system, this)def receive = {//處理發(fā)送而來的用戶ID,然后將結(jié)果發(fā)送給sender(本例中對應(yīng)CombineActor)case id:Int ?log.info("id="+id);sender!new InterestInfo(id,"足球")case _ ? log.info("received unknown message")}}//Person完整信息對應(yīng)Actorclass PersonActor extends Actor{val log = Logging(context.system, this)def receive = {case person: Person =>log.info("Person="+person)case _ ? log.info("received unknown message")}}class CombineActor extends Actor{implicit val timeout = Timeout(5 seconds)val basicInfoActor = context.actorOf(Props[BasicInfoActor],name="BasicInfoActor")val interestInfoActor = context.actorOf(Props[InterestInfoActor],name="InterestInfoActor")val personActor = context.actorOf(Props[PersonActor],name="PersonActor")def receive = {case id: Int =>val combineResult: Future[Person] =for {//向basicInfoActor發(fā)送Send-And-Receive-Future消息,mapTo方法將返回結(jié)果映射為BasicInfo類型basicInfo <- ask(basicInfoActor, id).mapTo[BasicInfo]//向interestInfoActor發(fā)送Send-And-Receive-Future消息,mapTo方法將返回結(jié)果映射為InterestInfo類型interestInfo <- ask(interestInfoActor, id).mapTo[InterestInfo]} yield Person(basicInfo, interestInfo)//將Future結(jié)果發(fā)送給PersonActorpipe(combineResult).to(personActor)}}val _system = ActorSystem("Send-And-Receive-Future")val combineActor = _system.actorOf(Props[CombineActor],name="CombineActor")combineActor ! 12345Thread.sleep(5000)_system.shutdown}代碼運(yùn)行結(jié)果如下:
[INFO] [03/20/2016 22:55:11.208] [Send-And-Receive-Future-akka.actor.default-dispatcher-3] [akka://Send-And-Receive-Future/user/CombineActor/BasicInfoActor] id=12345 [INFO] [03/20/2016 22:55:11.220] [Send-And-Receive-Future-akka.actor.default-dispatcher-2] [akka://Send-And-Receive-Future/user/CombineActor/InterestInfoActor] id=12345 [INFO] [03/20/2016 22:55:11.223] [Send-And-Receive-Future-akka.actor.default-dispatcher-4] [akka://Send-And-Receive-Future/user/CombineActor/PersonActor] Person=Person(BasicInfo(12345,John,19),InterestInfo(12345,足球))- 1
代碼中定義了3種類型的消息,分別是個(gè)人基礎(chǔ)信息case class BasicInfo(id:Int,val name:String, age:Int)、個(gè)人興趣信息case class InterestInfo(id:Int,val interest:String)以及完整個(gè)人信息case class Person(basicInfo: BasicInfo,interestInfo: InterestInfo),然后為這3種類型的消息定義了相應(yīng)的Actor即BasicInfoActor、InterestInfoActor和PersonActor,在CombineActor分別創(chuàng)建相應(yīng)Actor的實(shí)例,receive方法中使用ask向BasicInfoActor、InterestInfoActor發(fā)送Send-And-Receive-Future模型消息,BasicInfoActor、InterestInfoActor中的receive方法接收到發(fā)送來的Int類型消息并分別使用!向CombineActor發(fā)送BasicInfo、InterestInfo消息,將結(jié)果保存在Future[Person]中,然后通過代碼pipe(combineResult).to(personActor)將結(jié)果發(fā)送給PersonActor。
總結(jié)
以上是生活随笔為你收集整理的Akka并发编程——第六节:Actor模型(五)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Akka并发编程——第五节:Actor模
- 下一篇: Akka并发编程——第七节:Actor模