Scala基于Akka模拟Spark Master Worker进程间通信(一):Worker向Master注册
生活随笔
收集整理的這篇文章主要介紹了
Scala基于Akka模拟Spark Master Worker进程间通信(一):Worker向Master注册
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
最終效果
master:
worker:
思路分析
Master代碼
package cn.zxl.spark.masterimport akka.actor.{Actor, ActorRef, ActorSystem, Props} import cn.zxl.spark.common.{RegisterWorkerInfo, RegisteredWorkerInfo, WorkerInfo} import com.typesafe.config.ConfigFactoryimport scala.collection.mutable/*** @description:* @author: zhangxueliang* @create: 2021-05-29 16:37* @version: 1.0* */ class SparkMaster extends Actor {//定義hashmap。管理workersprivate val workers: mutable.Map[String, WorkerInfo] = mutable.Map[String, WorkerInfo]()override def receive: Receive = {case "start" => println("Spark Master服務器啟動了!")case RegisterWorkerInfo(id, cpu, ram) => {//接收到worker注冊信息if (!workers.contains(id)) {//創建WorkerInfo對象val workerInfo = new WorkerInfo(id, cpu, ram)//加入到workersworkers += (id -> workerInfo)println("服務器當前的workers="+workers)sender() ! RegisteredWorkerInfo}}} }object SparkMaster {def main(args: Array[String]): Unit = {//先創建ActorSystemval config = ConfigFactory.parseString(s"""|akka.actor.provider="akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname=127.0.0.1|akka.remote.netty.tcp.port=10005 """.stripMargin)val sparkMasterSystem: ActorSystem = ActorSystem("SparkMaster", config)//創建SparkMaster actorval sparkMasterRef: ActorRef = sparkMasterSystem.actorOf(Props[SparkMaster], "SparkMaster-01")//啟動SparkMastersparkMasterRef ! "start"} }啟動起來,驗證下端口號,是否正常啟動!
查看端口號:
Worker代碼
package cn.zxl.spark.workerimport akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props} import cn.zxl.spark.common.{RegisterWorkerInfo, RegisteredWorkerInfo} import com.typesafe.config.ConfigFactory/*** @description:* @author: zhangxueliang* @create: 2021-05-29 16:47* @version: 1.0* */ class SparkWorker(masterHost: String, masterPort: Int) extends Actor {//masterProxy是Master的代理/引用var masterProxy: ActorSelection = _//隨機生成一個IDprivate val id: String = java.util.UUID.randomUUID().toStringoverride def preStart(): Unit = {println("preStart()調用")//初始化masterProxymasterProxy = context.actorSelection(s"akka.tcp://SparkMaster@${masterHost}:${masterPort}/user/SparkMaster-01")println("masterProxy=" + masterProxy)}override def receive: Receive = {case "start" => {println("worker啟動了")masterProxy ! RegisterWorkerInfo(id, 16, 16 * 1024)}case RegisteredWorkerInfo => {println("workerid=" + id + "注冊成功")}} }object SparkWorker {def main(args: Array[String]): Unit = {val workerHost = "127.0.0.1"val workerPort = 10001val masterHost = "127.0.0.1"val masterPort = 10005val config = ConfigFactory.parseString(s"""|akka.actor.provider="akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname=127.0.0.1|akka.remote.netty.tcp.port=10002""".stripMargin)//創建ActorSystemval sparkWorkerSystem: ActorSystem = ActorSystem("SparkWorker", config)//創建SparkWorker的引用/代理val sparkWorkerRef: ActorRef = sparkWorkerSystem.actorOf(Props(new SparkWorker(masterHost, masterPort)), "SparkWorker-01")sparkWorkerRef ! "start"} }RegisterWorkerInfo樣例類
package cn.zxl.spark.common/*** @description:* @author: zhangxueliang* @create: 2021-05-29 17:12* @version: 1.0* */ //worker注冊信息 case class RegisterWorkerInfo(id:String,cpu:Int,ram:Int) //這個是WorkerInfo 此信息是用老保存到master的hashmap(管理worker) //將來這個workerInfo會擴展(如擴展worker上一次的心跳時間) class WorkerInfo(val id:String,val cpu:Int,val ram:Int) //當worker注冊成功,服務器返回 RegisteredWorkerInfo 對象 case object RegisteredWorkerInfopom文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.zxl</groupId><artifactId>scala-demo</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>scala-demo</name><url>http://maven.apache.org</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><encoding>UTF-8</encoding><scala.version>2.11.12</scala.version><scala.compat.version>2.11</scala.compat.version><akka.version>2.5.12</akka.version><scala.actors.version>2.10.0-M6</scala.actors.version></properties><dependencies><!--scala 2.11起過時 無法使用--><!--<dependency><groupId>org.scala-lang</groupId><artifactId>scala-actors</artifactId><version>${scala.actors.version}</version></dependency>--><!--akka actor依賴--><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-actor_${scala.compat.version}</artifactId><version>${akka.version}</version></dependency><!--多進程之間的Actor通信--><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-remote_${scala.compat.version}</artifactId><version>${akka.version}</version></dependency></dependencies><build><!--指定源碼包和測試包的位置--><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><!-- 該插件用于將 Scala 代碼編譯成 class 文件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><!-- 聲明綁定到 maven 的 compile 階段 --><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_depencencies</arg></args></configuration></execution></executions></plugin><!--maven打包的插件--><!-- maven-assembly-plugin 和 maven-shade-plugin都是打包插件。遇到同名文件assembly是覆蓋,shade是追加。所以此處選擇shade插件--><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>reference.conf</resource></transformer><!--指定main方法--><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>xxx</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build> </project>總結
以上是生活随笔為你收集整理的Scala基于Akka模拟Spark Master Worker进程间通信(一):Worker向Master注册的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Scala Akka网络编程:Clien
- 下一篇: Scala基于Akka模拟Spark M