Akka编写一个RPC框架,模拟多个Worker连接Master的情况的案例
生活随笔
收集整理的這篇文章主要介紹了
Akka编写一个RPC框架,模拟多个Worker连接Master的情况的案例
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
使用Akka編寫一個RPC框架,實現(xiàn)Master與多個Worker之間的通信。流程圖如下:
編寫Pom文件,Pom文件的代碼如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.toto.akka</groupId><artifactId>MyRPC</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>1.7</maven.compiler.source><maven.compiler.target>1.7</maven.compiler.target><encoding>UTF-8</encoding><scala.version>2.10.6</scala.version><scala.compat.version>2.10</scala.compat.version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-actor_2.10</artifactId><version>2.3.14</version></dependency><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-remote_2.10</artifactId><version>2.3.14</version></dependency></dependencies><build><!-- 源碼包放置的位置 --><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><!--專門用于編譯scala的插件--><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-make:transitive</arg><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><!--主要使用來的打包的插件--><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><!--表示把下面的這些文件給刪掉--><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>reference.conf</resource></transformer><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>cn.toto.akka.Worker</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build> </project>Message的代碼如下:
package cn.toto.akka/*** Created by toto on 2017/7/3.*/ trait Message extends Serializable//Worker -> Master worker向master發(fā)送相關(guān)信息 case class RegisterWorker(id: String, cores: Int, memory: Int) extends Message//Master -> Worker Master向worker發(fā)送相關(guān)信息 case class RegisteredWorker(masterUrl: String) extends Message//Worker -> Master case class Heartbeat(id: String) extends Message//Worker internal message case object SendHeartbeat//Master internal message case object CheckTimeOutWorkerWorkerInfo的代碼如下:
package cn.toto.akka/*** Created by toto on 2017/7/3.*/ class WorkerInfo(val id: String, val cores: Int, val memory: Int) {//TODOvar lastHeartbeatTime: Long = _ }Master的代碼如下:
package cn.toto.akkaimport akka.actor.{Actor, ActorSystem, Props} import akka.actor.Actor.Receive import com.typesafe.config.ConfigFactoryimport scala.collection.mutable//如果想使用millis毫秒單位,需要引入下面這個包 import scala.concurrent.duration._/*** Created by toto on 2017/7/3.*/ class Master(val host: String, val port: Int) extends Actor{//保存WorkerID 到 WorkerInfo的映射val idToWorker = new mutable.HashMap[String, WorkerInfo]()//保存所的WorkerInfo信息val workers = new mutable.HashSet[WorkerInfo]()val CHECK_INTERVAL = 15000override def preStart(): Unit = {//導入隱式轉(zhuǎn)換import context.dispatchercontext.system.scheduler.schedule(0 millis, CHECK_INTERVAL millis, self, CheckTimeOutWorker)}override def receive: Receive = {//Worker發(fā)送個Mater的注冊消息case RegisterWorker(workerId, cores, memory) => {if(!idToWorker.contains(workerId)) {//封裝worker發(fā)送的信息val workerInfo = new WorkerInfo(workerId, cores, memory)//保存workerInfoidToWorker(workerId) = workerInfoworkers += workerInfo//Master向Worker反饋注冊成功的消息sender ! RegisteredWorker(s"akka.tcp://${Master.MASTER_SYSTEM}@$host:$port/user/${Master.MASTER_NAME}")}}//Worker發(fā)送給Master的心跳信息case Heartbeat(workerId) => {if(idToWorker.contains(workerId)) {val workerInfo = idToWorker(workerId)val currentTime = System.currentTimeMillis()//更新上一次心跳時間workerInfo.lastHeartbeatTime = currentTime}}//檢測超時的Workercase CheckTimeOutWorker => {val currentTime = System.currentTimeMillis()val deadWorkers: mutable.HashSet[WorkerInfo] = workers.filter(w => currentTime - w.lastHeartbeatTime > CHECK_INTERVAL)// for(w <- deadWorkers) {// idToWorker -= w.id// workers -= w// }deadWorkers.foreach(w => {idToWorker -= w.idworkers -= w})println("alive worker size : " + workers.size)}} }object Master {val MASTER_SYSTEM = "MaterActorSystem"val MASTER_NAME = "Master"def main(args: Array[String]): Unit = {val host = args(0)val port = args(1).toIntval confStr =s"""|akka.actor.provider = "akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname = "$host"|akka.remote.netty.tcp.port = "$port"""".stripMarginval conf = ConfigFactory.parseString(confStr)//ActorSystem是單例的,用于創(chuàng)建Acotor并監(jiān)控actorval actorSystem = ActorSystem(MASTER_SYSTEM, conf)//通過ActorSystem創(chuàng)建ActoractorSystem.actorOf(Props(new Master(host, port)), MASTER_NAME)actorSystem.awaitTermination()} }worker的代碼如下:
package cn.toto.akkaimport java.util.UUIDimport akka.actor.{Actor, ActorSelection, ActorSystem, Props} import com.typesafe.config.ConfigFactoryimport scala.concurrent.duration._/*** Created by toto on 2017/7/3.*/ class Worker(val cores: Int, val memory: Int, val masterHost: String, val masterPort: Int) extends Actor{//Master的引用var master: ActorSelection = _//Worker的IDval workerId = UUID.randomUUID().toString//masterUrlvar masterUrl: String = _val HEARTBEAT_INTERVAL = 10000//preStart在構(gòu)造器之后receive之前執(zhí)行override def preStart(): Unit = {//首先跟Master建立連接master = context.actorSelection(s"akka.tcp://${Master.MASTER_SYSTEM}@$masterHost:$masterPort/user/${Master.MASTER_NAME}")//通過master的引用向Master發(fā)送注冊消息master ! RegisterWorker(workerId, cores, memory)}override def receive: Receive = {//Master發(fā)送給Worker注冊成功的消息case RegisteredWorker(masterUrl) => {this.masterUrl = masterUrl//啟動定時任務(wù),向Master發(fā)送心跳//導入隱式轉(zhuǎn)換import context.dispatchercontext.system.scheduler.schedule(0 millis, HEARTBEAT_INTERVAL millis, self, SendHeartbeat)}case SendHeartbeat => {//向Master發(fā)送心跳master ! Heartbeat(workerId)}} }object Worker {def main(args: Array[String]): Unit = {//Worker的地址和端口val host = args(0)val port = args(1).toIntval cores = args(2).toIntval memory = args(3).toInt//Master的地址和端口val masterHost = args(4)val masterPort = args(5).toIntval confStr =s"""|akka.actor.provider = "akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname = "$host"|akka.remote.netty.tcp.port = "$port"""".stripMarginval conf = ConfigFactory.parseString(confStr)//單例的ActorSystemval actorSystem = ActorSystem("WorkerActorSystem", conf)//通過actorSystem來創(chuàng)建Actorval worker = actorSystem.actorOf(Props(new Worker(cores, memory, masterHost, masterPort)), "Worker")actorSystem.awaitTermination()} }運行Master的代碼:
準備工作:
接著模擬傳入的參數(shù):
接著右鍵Run Master,效果如下:
運行Woker程序
模擬傳入的參數(shù):
右鍵運行Worker的代碼,效果如下:
總結(jié)
以上是生活随笔為你收集整理的Akka编写一个RPC框架,模拟多个Worker连接Master的情况的案例的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 福特传感器已关闭什么意思?
- 下一篇: 江淮帅铃国四康明斯打火没反应开钥匙有电用