Akka(9): 分布式运算:Remoting-远程构建式
? ?上篇我們討論了Akka-Remoting。我們說Akka-Remoting是一種點(diǎn)對(duì)點(diǎn)的通訊方式,能使兩個(gè)不同JVM上Akka-ActorSystem上的兩個(gè)Actor之間可以相互溝通。Akka-Remoting還沒有實(shí)現(xiàn)完全的Actor位置透明(location transparency),因?yàn)橐粋€(gè)Actor還必須在獲得對(duì)方Actor確切地址信息后才能啟動(dòng)與之溝通過程。Akka-Remoting支持“遠(yuǎn)程查找”和“遠(yuǎn)程構(gòu)建”兩種溝通方式。由于篇幅所限,我們只介紹了“遠(yuǎn)程查找”。在這一篇里我們將會(huì)討論“遠(yuǎn)程構(gòu)建”方式。
同樣,我們先通過項(xiàng)目結(jié)構(gòu)來分析:
lazy val local = (project in file(".")).settings(commonSettings).settings(name := "remoteCreateDemo").aggregate(calculator,remote).dependsOn(calculator)lazy val calculator = (project in file("calculator")).settings(commonSettings).settings(name := "calculator")lazy val remote = (project in file("remote")).settings(commonSettings).settings(name := "remoteSystem").aggregate(calculator).dependsOn(calculator)遠(yuǎn)程構(gòu)建的過程大致是這樣的:由local通知remote啟動(dòng)構(gòu)建Actor;remote從本地庫(kù)中查找Actor的類定義(class)并把它載入內(nèi)存。由于驅(qū)動(dòng)、使用遠(yuǎn)程Actor是在local進(jìn)行的,所以local,remote項(xiàng)目還必須共享Calculator,包括Calculator的功能消息。這項(xiàng)要求我們?cè)?sbt中用aggregate(calculator)來協(xié)同編譯。
我們把Calculator的監(jiān)管supervisor也包括在這個(gè)源碼文件里。現(xiàn)在這個(gè)calculator是個(gè)包括監(jiān)管、功能、消息的完整項(xiàng)目了。Calculator源代碼如下:
package remoteCreation.calculatorimport akka.actor._ import scala.concurrent.duration._object Calcultor {sealed trait MathOpscase class Num(dnum: Double) extends MathOpscase class Add(dnum: Double) extends MathOpscase class Sub(dnum: Double) extends MathOpscase class Mul(dnum: Double) extends MathOpscase class Div(dnum: Double) extends MathOpssealed trait CalcOpscase object Clear extends CalcOpscase object GetResult extends CalcOpsdef props = Props(new Calcultor)def supervisorProps = Props(new SupervisorActor) }class Calcultor extends Actor with ActorLogging {import Calcultor._var result: Double = 0.0 //internal stateoverride def receive: Receive = {case Num(d) => result = dcase Add(d) => result += dcase Sub(d) => result -= dcase Mul(d) => result *= dcase Div(d) =>val _ = result.toInt / d.toInt //yield ArithmeticExceptionresult /= dcase Clear => result = 0.0case GetResult =>sender() ! s"Result of calculation is: $result"}override def preRestart(reason: Throwable, message: Option[Any]): Unit = {log.info(s"Restarting calculator: ${reason.getMessage}")super.preRestart(reason, message)} }class SupervisorActor extends Actor {def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {case _: ArithmeticException => SupervisorStrategy.Resume}override def supervisorStrategy: SupervisorStrategy =OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){decider.orElse(SupervisorStrategy.defaultDecider)}val calcActor = context.actorOf(Calcultor.props,"calculator")override def receive: Receive = {case msg@ _ => calcActor.forward(msg)}}與上一個(gè)例子的”遠(yuǎn)程查找式“相同,remote需要為Remoting公開一個(gè)端口。我們可以照搬.conf配置文件內(nèi)容:remote/src/main/resources/application.conf
akka {actor {provider = remote}remote {enabled-transports = ["akka.remote.netty.tcp"]netty.tcp {hostname = "127.0.0.1"port = 2552}log-sent-messages = onlog-received-messages = on} }由于遠(yuǎn)程構(gòu)建和使用是在local上進(jìn)行的,在remote上我們只需要啟動(dòng)ActorSystem就行了:
import com.typesafe.config.ConfigFactory import akka.actor._object CalculatorRunner extends App {val remoteSystem = ActorSystem("remoteSystem",ConfigFactory.load("application"))println("Remote system started.")scala.io.StdIn.readLine()remoteSystem.terminate()}Calculator的構(gòu)建是在localSystem上啟動(dòng)的,我們需要在配置文件中描述遠(yuǎn)程構(gòu)建標(biāo)的(還是未能實(shí)現(xiàn)位置透明):local/src/main/resources/application.conf?
akka {actor {provider = remote,deployment {"/calculator" {remote = "akka.tcp://remoteSystem@127.0.0.1:2552"}}}remote {netty.tcp {hostname = "127.0.0.1",port=2554}} }注意:上面這個(gè)/calculator設(shè)置實(shí)際上指的是SupervisorActor。
現(xiàn)在我們可以在local上開始構(gòu)建calculator,然后使用它來運(yùn)算了:
import akka.actor._ import remoteCreation.calculator.Calcultor._ import scala.concurrent.duration._ import akka.pattern._object RemotingCreate extends App {val localSystem = ActorSystem("localSystem")val calcActor = localSystem.actorOf(props,name = "calculator") //created SupervisorActorimport localSystem.dispatchercalcActor ! ClearcalcActor ! Num(13.0)calcActor ! Mul(1.5)implicit val timeout = akka.util.Timeout(1 second)((calcActor ? GetResult).mapTo[String]) foreach printlnscala.io.StdIn.readLine()calcActor ! Div(0.0)calcActor ! Div(1.5)calcActor ! Add(100.0)((calcActor ? GetResult).mapTo[String]) foreach printlnscala.io.StdIn.readLine()localSystem.terminate()}從代碼上看構(gòu)建calculator(SupervisorActor)過程與普通的Actor構(gòu)建沒分別,所有細(xì)節(jié)都放在配置文件里了。但是,要注意actorOf的name必須與配置文檔中的設(shè)置匹配。
試運(yùn)行結(jié)果與上一個(gè)例子相同。值得注意的是實(shí)際遠(yuǎn)程構(gòu)建的是一個(gè)SupervisorActor。Calculator的構(gòu)建是SupervisorActor構(gòu)建的其中一部分。從運(yùn)算結(jié)果看:這個(gè)SupervisorActor也實(shí)現(xiàn)了它的功能。
下面是這次示范的源代碼:
?local/build.sbt
azy val commonSettings = seq (name := "RemoteCreateDemo",version := "1.0",scalaVersion := "2.11.8",libraryDependencies := Seq("com.typesafe.akka" %% "akka-actor" % "2.5.2","com.typesafe.akka" %% "akka-remote" % "2.5.2") )lazy val local = (project in file(".")).settings(commonSettings).settings(name := "remoteCreateDemo").aggregate(calculator).dependsOn(calculator)lazy val calculator = (project in file("calculator")).settings(commonSettings).settings(name := "calculator")lazy val remote = (project in file("remote")).settings(commonSettings).settings(name := "remoteSystem").aggregate(calculator).dependsOn(calculator)calculator/calculator.scala
package remoteCreation.calculatorimport akka.actor._ import scala.concurrent.duration._object Calcultor {sealed trait MathOpscase class Num(dnum: Double) extends MathOpscase class Add(dnum: Double) extends MathOpscase class Sub(dnum: Double) extends MathOpscase class Mul(dnum: Double) extends MathOpscase class Div(dnum: Double) extends MathOpssealed trait CalcOpscase object Clear extends CalcOpscase object GetResult extends CalcOpsdef props = Props(new Calcultor)def supervisorProps = Props(new SupervisorActor) }class Calcultor extends Actor with ActorLogging {import Calcultor._var result: Double = 0.0 //internal stateoverride def receive: Receive = {case Num(d) => result = dcase Add(d) => result += dcase Sub(d) => result -= dcase Mul(d) => result *= dcase Div(d) =>val _ = result.toInt / d.toInt //yield ArithmeticExceptionresult /= dcase Clear => result = 0.0case GetResult =>sender() ! s"Result of calculation is: $result"}override def preRestart(reason: Throwable, message: Option[Any]): Unit = {log.info(s"Restarting calculator: ${reason.getMessage}")super.preRestart(reason, message)} }class SupervisorActor extends Actor {def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {case _: ArithmeticException => SupervisorStrategy.Resume}override def supervisorStrategy: SupervisorStrategy =OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){decider.orElse(SupervisorStrategy.defaultDecider)}val calcActor = context.actorOf(Calcultor.props,"calculator")override def receive: Receive = {case msg@ _ => calcActor.forward(msg)}}remote/src/main/resources/application.conf
akka {actor {provider = remote}remote {enabled-transports = ["akka.remote.netty.tcp"]netty.tcp {hostname = "127.0.0.1"port = 2552}log-sent-messages = onlog-received-messages = on} }remote/CalculatorRunner.scala
package remoteCreation.remote import com.typesafe.config.ConfigFactory import akka.actor._object CalculatorRunner extends App {val remoteSystem = ActorSystem("remoteSystem",ConfigFactory.load("application"))println("Remote system started.")scala.io.StdIn.readLine()remoteSystem.terminate()}local/src/main/resources/application.conf
akka {actor {provider = remote,deployment {"/calculator" {remote = "akka.tcp://remoteSystem@127.0.0.1:2552"}}}remote {netty.tcp {hostname = "127.0.0.1",port=2554}} }local/RemotingCreation.scala
import akka.actor._ import remoteCreation.calculator.Calcultor._ import scala.concurrent.duration._ import akka.pattern._object RemotingCreate extends App {val localSystem = ActorSystem("localSystem")val calcActor = localSystem.actorOf(props,name = "calculator") //created SupervisorActor import localSystem.dispatchercalcActor ! ClearcalcActor ! Num(13.0)calcActor ! Mul(1.5)implicit val timeout = akka.util.Timeout(1 second)((calcActor ? GetResult).mapTo[String]) foreach printlnscala.io.StdIn.readLine()calcActor ! Div(0.0)calcActor ! Div(1.5)calcActor ! Add(100.0)((calcActor ? GetResult).mapTo[String]) foreach printlnscala.io.StdIn.readLine()localSystem.terminate()}?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
轉(zhuǎn)載于:https://www.cnblogs.com/tiger-xc/p/7063301.html
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)總結(jié)
以上是生活随笔為你收集整理的Akka(9): 分布式运算:Remoting-远程构建式的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: android Dialog提示框。单选
- 下一篇: 极域电子教室常见问题解决方案