Akka 指南 之「Actors」
溫馨提示:Akka 中文指南的 GitHub 地址為「akka-guide」,歡迎大家Star、Fork,糾錯。
文章目錄
- Actors
- 依賴
- 簡介
- 創建 Actors
- 定義 Actor 類
- Props
- 危險的變體
- 推薦實踐
- 使用 Props 創建 Actors
- 依賴注入
- 收件箱
- Actor API
- Actor 生命周期
- 生命周期監控,或稱為 DeathWatch
- Start 鉤子
- Restart 鉤子
- Stop 鉤子
- 通過 Actor Selection 識別 Actor
- 信息和不變性
- 發送消息
- Tell: Fire-forget
- Ask: Send-And-Receive-Future
- 轉發消息
- 接收消息
- 回復消息
- 接收超時
- 定時器和調度消息
- 停止 Actor
- PoisonPill
- 殺死一個 Actor
- 優雅的停止
- 協調關閉
- Become/Unbecome
- 升級
- 對 Scala Actor 嵌套接收進行編碼,而不會意外泄漏內存
- Stash
- Actor 和異常
- 消息發生了什么
- 郵箱發生了什么
- Actor 發現了什么
- 初始化模式
- 通過構造函數初始化
- 通過 preStart 初始化
- 通過消息傳遞初始化
Actors
依賴
為了使用 Actors,你必須在項目中添加如下依賴:
<!-- Maven --> <dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-actor_2.12</artifactId><version>2.5.21</version> </dependency><!-- Gradle --> dependencies {compile group: 'com.typesafe.akka', name: 'akka-actor_2.12', version: '2.5.21' }<!-- sbt --> libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.5.21"簡介
「Actor Model」為編寫并發和分布式系統提供了更高級別的抽象。它減少了開發人員必須處理顯式鎖和線程管理的問題,使編寫正確的并發和并行系統變得更容易。1973 年卡爾·休伊特(Carl Hewitt)在論文中定義了 Actors,然后通過 Erlang 語言所普及,并且在愛立信(Ericsson)成功地建立了高度并發和可靠的電信系統。
Akka 的 Actors API 類似于 Scala Actors,它從 Erlang 中借用了一些語法。
創建 Actors
由于 Akka 實施父級(parental)監督,每個 Actor 都受到其父級的監督并且監督其子級,因此建議你熟悉「Actor Systems」和「Supervision」,它還可能有助于閱讀「Actor References, Paths and Addresses」。
定義 Actor 類
Actor 類是通過繼承AbstractActor類并在createReceive方法中設置“初始行為”來實現的。
createReceive方法沒有參數,并返回AbstractActor.Receive。它定義了 Actor 可以處理哪些消息,以及如何處理消息的實現。可以使用名為ReceiveBuilder的生成器來構建此類行為。在AbstractActor中,有一個名為receiveBuilder的方便的工廠方法。
下面是一個例子:
import akka.actor.AbstractActor; import akka.event.Logging; import akka.event.LoggingAdapter;public class MyActor extends AbstractActor {private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);@Overridepublic Receive createReceive() {return receiveBuilder().match(String.class,s -> {log.info("Received String message: {}", s);}).matchAny(o -> log.info("received unknown message")).build();} }請注意,Akka Actor 消息循環是徹底的(exhaustive),與 Erlang 和后 Scala Actors 相比,它是不同的。這意味著你需要為它可以接受的所有消息提供一個模式匹配,如果你希望能夠處理未知消息,那么你需要有一個默認情況,如上例所示。否則,akka.actor.UnhandledMessage(message, sender, recipient)將發布到ActorSystem的EventStream。
請進一步注意,上面定義的行為的返回類型是Unit;如果 Actor 應回復收到的消息,則必須按照下面的說明顯式完成。
createReceive方法的結果是AbstractActor.Receive,它是圍繞部分 Scala 函數對象的包裝。它作為其“初始行為”存儲在 Actor 中,有關在 Actor 構造后更改其行為的詳細信息,請參見「Become/Unbecome」。
Props
Props是一個配置類,用于指定創建 Actor 的選項,將其視為不可變的,因此可以自由共享用于創建 Actor 的方法,包括關聯的部署信息(例如,要使用哪個調度程序,請參閱下面的更多內容)。下面是一些如何創建Props實例的示例。
import akka.actor.Props; Props props1 = Props.create(MyActor.class); Props props2 =Props.create(ActorWithArgs.class, () -> new ActorWithArgs("arg")); // careful, see below Props props3 = Props.create(ActorWithArgs.class, "arg");第二個變量演示了如何將構造函數參數傳遞給正在創建的 Actor,但它只能在 Actor 之外使用,如下所述。
最后一行顯示了傳遞構造函數參數的可能性,而不管它在哪個上下文中使用。在Props對象的構造過程中,會驗證是否存在匹配的構造函數,如果未找到匹配的構造函數或找到多個匹配的構造函數,則會導致IllegalArgumentException。
危險的變體
// NOT RECOMMENDED within another actor: // encourages to close over enclosing class Props props7 = Props.create(ActorWithArgs.class, () -> new ActorWithArgs("arg"));不建議在另一個 Actor 中使用此方法,因為它鼓勵關閉封閉范圍,從而導致不可序列化的屬性和可能的競態條件(破壞 Actor 封裝)。另一方面,在 Actor 的同伴對象(companion object)中的Props工廠中使用這個變體是完全正確的,如下面的“推薦實踐”中所述。
這些方法有兩個用例:將構造函數參數傳遞給由新引入的Props.create(clazz, args)方法或下面的推薦實踐解決的 Actor,并將 Actor “就地(on the spot)”創建為匿名類。后者應該通過將這些 Actor 命名為類來解決(如果它們沒有在頂級object中聲明,則需要將封閉實例的this引用作為第一個參數傳遞)。
- 警告:在另一個 Actor 中聲明一個 Actor 是非常危險的,并且會破壞 Actor 的封裝。千萬不要把 Actor 的this引用傳給Props!
推薦實踐
為每個 Actor 提供靜態工廠方法是一個好主意,這有助于使合適的Props創建盡可能接近 Actor 的定義。這也避免了與使用Props.create(...)方法相關聯的陷阱,該方法將參數作為構造函數參數,因為在靜態方法中,給定的代碼塊不會保留對其封閉范圍的引用:
static class DemoActor extends AbstractActor {/*** Create Props for an actor of this type.** @param magicNumber The magic number to be passed to this actor’s constructor.* @return a Props for creating this actor, which can then be further configured (e.g. calling* `.withDispatcher()` on it)*/static Props props(Integer magicNumber) {// You need to specify the actual type of the returned actor// since Java 8 lambdas have some runtime type information erasedreturn Props.create(DemoActor.class, () -> new DemoActor(magicNumber));}private final Integer magicNumber;public DemoActor(Integer magicNumber) {this.magicNumber = magicNumber;}@Overridepublic Receive createReceive() {return receiveBuilder().match(Integer.class,i -> {getSender().tell(i + magicNumber, getSelf());}).build();} }static class SomeOtherActor extends AbstractActor {// Props(new DemoActor(42)) would not be safeActorRef demoActor = getContext().actorOf(DemoActor.props(42), "demo");// ... }另一個好的做法是聲明 Actor 可以接收的消息盡可能接近 Actor 的定義(例如,作為 Actor 內部的靜態類或使用其他合適的類),這使得更容易知道它可以接收到什么:
static class DemoMessagesActor extends AbstractLoggingActor {public static class Greeting {private final String from;public Greeting(String from) {this.from = from;}public String getGreeter() {return from;}}@Overridepublic Receive createReceive() {return receiveBuilder().match(Greeting.class,g -> {log().info("I was greeted by {}", g.getGreeter());}).build();} }使用 Props 創建 Actors
Actors 是通過將Props實例傳遞到actorOf工廠方法來創建的,該方法在ActorSystem和ActorContext上可用。
import akka.actor.ActorRef; import akka.actor.ActorSystem;使用ActorSystem將創建頂級 Actor,由ActorSystem提供的守護者 Actor 進行監督,而使用ActorContext將創建子 Actor。
static class FirstActor extends AbstractActor {final ActorRef child = getContext().actorOf(Props.create(MyActor.class), "myChild");@Overridepublic Receive createReceive() {return receiveBuilder().matchAny(x -> getSender().tell(x, getSelf())).build();} }建議創建一個子級、孫子(grand-children)級這樣的層次結構,以便它適合應用程序的邏輯故障處理結構,詳見「Actor Systems」。
對actorOf的調用返回ActorRef的實例。這是 Actor 實例的句柄,也是與之交互的唯一方法。ActorRef是不可變的,并且與它所表示的 Actor 有一對一的關系。ActorRef也是可序列化的,并且具有網絡意識(network-aware)。這意味著你可以序列化它,通過網絡發送它,并在遠程主機上使用它,并且它仍然在網絡上表示原始節點上的同一個 Actor。
name參數是可選的,但你最好為 Actor 命名,因為它用于日志消息和標識 Actor。名稱不能為空或以$開頭,但可以包含 URL 編碼字符(例如,空格為%20)。如果給定的名稱已被同一父級的另一個子級使用,則會引發InvalidActorNameException。
Actor 在創建時自動異步啟動。
依賴注入
如果你的 Actor 有一個接受參數的構造函數,那么這些參數也需要成為Props的一部分,如上所述。但在某些情況下,必須使用工廠方法,例如,當依賴項注入(dependency injection)框架確定實際的構造函數參數時。
import akka.actor.Actor; import akka.actor.IndirectActorProducer; class DependencyInjector implements IndirectActorProducer {final Object applicationContext;final String beanName;public DependencyInjector(Object applicationContext, String beanName) {this.applicationContext = applicationContext;this.beanName = beanName;}@Overridepublic Class<? extends Actor> actorClass() {return TheActor.class;}@Overridepublic TheActor produce() {TheActor result;result = new TheActor((String) applicationContext);return result;} }final ActorRef myActor =getContext().actorOf(Props.create(DependencyInjector.class, applicationContext, "TheActor"), "TheActor");- 警告:有時,你可能會試圖提供始終返回同一實例的IndirectActorProducer,例如使用靜態字段。這是不支持的,因為它違背了 Actor 重新啟動的含義,這里描述了:「重新啟動意味著什么」。
當使用依賴注入框架時,Actor bean不能有singleton作用域。
在「Using Akka with Dependency Injection」指南和「Akka Java Spring」教程中,有關于依賴注入的更深層次的描述。
收件箱
當在與 Actor 通信的 Actor 外部編寫代碼時,ask模式可以是一個解決方案(見下文),但它不能做兩件事:接收多個回復(例如,通過向通知服務訂閱ActorRef)和觀察其他 Actor 的生命周期。出于這些目的,有一個Inbox類:
final Inbox inbox = Inbox.create(system); inbox.send(target, "hello"); try {assert inbox.receive(Duration.ofSeconds(1)).equals("world"); } catch (java.util.concurrent.TimeoutException e) {// timeout }send方法包裝了一個普通的tell,并將內部 Actor 的引用作為發送者提供。這允許在最后一行接收回復。監視(Watching)Actor 也很簡單:
final Inbox inbox = Inbox.create(system); inbox.watch(target); target.tell(PoisonPill.getInstance(), ActorRef.noSender()); try {assert inbox.receive(Duration.ofSeconds(1)) instanceof Terminated; } catch (java.util.concurrent.TimeoutException e) {// timeout }Actor API
AbstractActor類定義了一個名為createReceive的方法,該方法用于設置 Actor 的“初始行為”。
如果當前的 Actor 行為與接收到的消息不匹配,則調用unhandled,默認情況下,它在Actor 系統的事件流上發布akka.actor.UnhandledMessage(message, sender, recipient)(將配置項akka.actor.debug.unhandled設置為on,以便將其轉換為實際的Debug消息)。
此外,它還提供:
- getSelf(),對 Actor 的ActorRef的引用
- getSender(),前一次接收到的消息的發送方 Actor 的引用
- supervisorStrategy(),用戶可重寫定義用于監視子 Actor 的策略
該策略通常在 Actor 內部聲明,以便訪問決策函數中 Actor 的內部狀態:由于故障作為消息發送給其監督者并像其他消息一樣進行處理(盡管不屬于正常行為),因此 Actor 內的所有值和變量都可用,就像sender引用一樣(報告失敗的是直接子級;如果原始失敗發生在一個遙遠的后代中,則每次仍向上一級報告)。
- getContext()公開 Actor 和當前消息的上下文信息,例如:
- 創建子 Actor 的工廠方法(actorOf)
- Actor 所屬的系統
- 父級監督者
- 受監督的子級
- 生命周期監控
- 如Become/Unbecome中所述的熱交換行為棧(hotswap behavior stack)
其余可見的方法是用戶可重寫的生命周期鉤子方法,如下所述:
public void preStart() {}public void preRestart(Throwable reason, Optional<Object> message) {for (ActorRef each : getContext().getChildren()) {getContext().unwatch(each);getContext().stop(each);}postStop(); }public void postRestart(Throwable reason) {preStart(); }public void postStop() {}上面顯示的實現是AbstractActor類提供的默認值。
Actor 生命周期
Actor 系統中的一條路徑表示一個“地方”,可能被一個活著的 Actor 占據。最初(除了系統初始化的 Actor 之外)路徑是空的,當調用actorOf()時,它將通過傳遞的Props描述的 Actor 的化身(incarnation)分配給給定的路徑,Actor 的化身由路徑(path)和UID標識。
值得注意的是:
- restart
- stop,然后重新創建 Actor
如下所述。
重新啟動(restart)只交換由Props定義的Actor實例,因此UID保持不變。只要化身是相同的,你可以繼續使用相同的ActorRef。重啟是通過 Actor 的父 Actor 的「Supervision Strategy」來處理的,關于重啟的含義還有「更多的討論」。
當 Actor 停止時,化身的生命周期就結束了。此時將調用適當的生命周期事件,并將終止通知觀察 Actor。當化身停止后,可以通過使用actorOf()創建 Actor 來再次使用路徑。在這種情況下,新化身的名稱將與前一個相同,但UID將不同。Actor 可以由 Actor 本身、另一個 Actor 或 Actor 系統停止,詳見「停止 Actor」。
- 注釋:重要的是要注意,Actor 不再被引用時不會自動停止,創建的每個 Actor 也必須顯式銷毀。唯一的簡化是,停止父 Actor 也將遞歸地停止此父 Actor 創建的所有子 Actor。
ActorRef總是代表一個化身(路徑和UID),而不僅僅是一個給定的路徑。因此,如果一個 Actor 被停止,一個同名的新 Actor 被創造出來,舊化身的 Actor 引用就不會指向新的 Actor。
另一方面,ActorSelection指向路徑(或者如果使用通配符,則指向多個路徑),并且完全忽略了具體化當前正在占用的路徑。由于這個原因,ActorSelection不能被觀看??梢酝ㄟ^向ActorSelection發送Identify消息來獲取(resolve)當前化身的ActorRef,該消息將以包含正確引用的ActorIdentity回復,詳見ActorSelection」。這也可以通過ActorSelection的resolveOne方法來實現,該方法返回匹配ActorRef的Future。
生命周期監控,或稱為 DeathWatch
為了在另一個 Actor 終止時得到通知(即永久停止,而不是臨時失敗和重新啟動),Actor 可以注冊(register)自己,以便在終止時接收另一個 Actor 發送的Terminated消息,詳見「停止 Actor」。此服務由 Actor 系統的DeathWatch組件提供。
注冊監視器(monitor)很容易:
import akka.actor.Terminated; static class WatchActor extends AbstractActor {private final ActorRef child = getContext().actorOf(Props.empty(), "target");private ActorRef lastSender = system.deadLetters();public WatchActor() {getContext().watch(child); // <-- this is the only call needed for registration}@Overridepublic Receive createReceive() {return receiveBuilder().matchEquals("kill",s -> {getContext().stop(child);lastSender = getSender();}).match(Terminated.class,t -> t.actor().equals(child),t -> {lastSender.tell("finished", getSelf());}).build();} }在這里,有一點需要我們注意:Terminated消息的生成與注冊和終止發生的順序無關。特別是,即使被監視的 Actor 在注冊時已經被終止,監視的 Actor 也將收到一條Terminated消息。
多次注冊并不一定會導致生成多條消息,但不能保證只接收到一條這樣的消息:如果被監視的 Actor 的終止消息已經生成并將消息排隊,并且在處理此消息之前完成了另一個注冊,則第二條消息也將進入消息隊列。因為注冊監視已經終止的 Actor 會導致立即生成Terminated消息。
也可以通過context.unwatch(target)取消監視另一個 Actor 的存活情況。即使Terminated消息已經在郵箱中排隊,也可以這樣做;在調用unwatch之后,將不再處理該 Actor 的Terminated消息。
Start 鉤子
啟動 Actor 之后,立即調用其preStart方法。
@Override public void preStart() {target = getContext().actorOf(Props.create(MyActor.class, "target")); }首次創建 Actor 時調用此方法。在重新啟動期間,它由postRestart的默認實現調用,這意味著通過重寫該方法,你可以選擇是否只為此 Actor 或每次重新啟動時調用一次此方法中的初始化代碼。當創建 Actor 類的實例時,總是會調用作為 Actor 構造函數一部分的初始化代碼,該實例在每次重新啟動時都會發生。
Restart 鉤子
所有 Actor 都受到監督,即通過故障處理策略鏈接到另一個 Actor。如果在處理消息時引發異常,則可以重新啟動 Actor(詳見「supervision」)。重新啟動涉及上述掛鉤:
Actor 重新啟動僅替換實際的 Actor 對象;郵箱的內容不受重新啟動的影響,因此在postRestart鉤子返回后,將繼續處理消息,而且將不再接收觸發異常的消息。重新啟動時發送給 Actor 的任何消息都將像往常一樣排隊進入其郵箱。
- 警告:請注意,與用戶消息相關的失敗通知的順序是不確定的。特別是,父級可以在處理子級在失敗之前發送的最后一條消息之前重新啟動其子級。有關詳細信息,請參閱「討論:消息排序」。
Stop 鉤子
停止某個 Actor 后,將調用其postStop鉤子,該鉤子可用于將該 Actor 從其他服務中注銷。此鉤子保證在禁用此 Actor 的消息隊列后運行,即發送到已停止 Actor 的消息將被重定向到ActorSystem的deadLetters。
通過 Actor Selection 識別 Actor
如「Actor References, Paths and Addresses」中所述,每個 Actor 都有唯一的邏輯路徑,該路徑通過從子級到父級的 Actor 鏈獲得,直到到達 Actor 系統的根,并且它有一個物理路徑,如果監督鏈(supervision chain)包含任何遠程監管者,則該路徑可能有所不同。系統使用這些路徑來查找 Actor,例如,當接收到遠程消息并搜索收件人時,它們很有用:Actor 可以通過指定邏輯或物理的絕對或相對路徑來查找其他 Actor,并接收帶有結果的ActorSelection:
// will look up this absolute path getContext().actorSelection("/user/serviceA/actor"); // will look up sibling beneath same supervisor getContext().actorSelection("../joe");- 注釋:與其他 Actor 交流時,最好使用他們的ActorRef,而不是依靠ActorSelection。但也有例外,如
- 使用「至少一次傳遞」能力發送消息
- 啟動與遠程系統的第一次連接
在所有其他情況下,可以在 Actor 創建或初始化期間提供ActorRef,將其從父級傳遞到子級,或者通過將其ActorRef發送到其他 Actor 來引出 Actor。
提供的路徑被解析為java.net.URI,這意味著它在路徑元素上被/拆分。如果路徑以/開頭,則為絕對路徑,查找從根守護者(它是/user的父級)開始;否則,查找從當前 Actor 開始。如果路徑元素等于..,則查找將向當前遍歷的 Actor 的監督者“向上”一步,否則將向命名的子級“向下”一步。應該注意的是..在 Actor 路徑中,總是指邏輯結構,即監督者。
Actor 選擇(selection)的路徑元素可以包含允許向該部分廣播(broadcasting)消息的通配符模式:
// will look all children to serviceB with names starting with worker getContext().actorSelection("/user/serviceB/worker*"); // will look up all siblings beneath same supervisor getContext().actorSelection("../*");消息可以通過ActorSelection發送,并且在傳遞每個消息時查找ActorSelection的路徑。如果選擇(selection)與任何 Actor 都不匹配,則消息將被刪除。
要獲取ActorRef以進行ActorSelection,你需要向選擇發送消息,并使用來自 Actor 的答復的getSender()引用。有一個內置的Identify消息,所有 Actor 都將理解該消息,并使用包含ActorRef的ActorIdentity消息自動回復。此消息由 Actor 專門處理,如果具體的名稱查找失敗(即非通配符路徑元素與存活的 Actor 不對應),則會生成負(negative)結果。請注意,這并不意味著保證回復的傳遞(delivery of that reply is guaranteed),它仍然是正常的消息。
import akka.actor.ActorIdentity; import akka.actor.ActorSelection; import akka.actor.Identify; static class Follower extends AbstractActor {final Integer identifyId = 1;public Follower() {ActorSelection selection = getContext().actorSelection("/user/another");selection.tell(new Identify(identifyId), getSelf());}@Overridepublic Receive createReceive() {return receiveBuilder().match(ActorIdentity.class,id -> id.getActorRef().isPresent(),id -> {ActorRef ref = id.getActorRef().get();getContext().watch(ref);getContext().become(active(ref));}).match(ActorIdentity.class,id -> !id.getActorRef().isPresent(),id -> {getContext().stop(getSelf());}).build();}final AbstractActor.Receive active(final ActorRef another) {return receiveBuilder().match(Terminated.class, t -> t.actor().equals(another), t -> getContext().stop(getSelf())).build();} }你還可以使用ActorSelection的resolveOne方法獲取ActorRef以進行actorselection。如果存在這樣的 Actor,它將返回匹配的ActorRef的Future,可參閱「 Java 8 Compatibility for Java compatibility」。如果不存在這樣的 Actor 或標識在提供的timeout內未完成,則完成此操作并拋出akka.actor.ActorNotFound異常。
如果啟用「remoting」,也可以查找遠程 Actor 的地址:
getContext().actorSelection("akka.tcp://app@otherhost:1234/user/serviceB");「Remoting Sample」中給出了一個在啟用遠程處理(remoting)的情況下演示 Actor 查找的例子。
信息和不變性
- 重要的:消息可以是任何類型的對象,但必須是不可變的。Akka 還不能強制執行不可變性,所以必須按慣例執行。
以下是不可變消息的示例:
public class ImmutableMessage {private final int sequenceNumber;private final List<String> values;public ImmutableMessage(int sequenceNumber, List<String> values) {this.sequenceNumber = sequenceNumber;this.values = Collections.unmodifiableList(new ArrayList<String>(values));}public int getSequenceNumber() {return sequenceNumber;}public List<String> getValues() {return values;} }發送消息
消息通過以下方法之一發送給 Actor。
- tell的意思是“發送并忘記(fire-and-forget)”,例如異步發送消息并立即返回。
- ask異步發送消息,并返回一個表示可能的答復。
每一個發送者都有消息順序的保證。
- 注釋:使用ask會帶來性能方面的影響,因為有些東西需要跟蹤它何時超時,需要有一些東西將一個Promise連接到ActorRef中,并且還需要通過遠程處理實現它。所以,我們更傾向于使用tell,只有當你有足夠的理由時才應該使用ask。
在所有這些方法中,你可以選擇傳遞自己的ActorRef。將其作為一種實踐,因為這樣做將允許接收者 Actor 能夠響應你的消息,因為發送者引用與消息一起發送。
Tell: Fire-forget
這是發送消息的首選方式,它不用等待消息返回,因此不是阻塞的。這提供了最佳的并發性和可伸縮性的特性。
// don’t forget to think about who is the sender (2nd argument) target.tell(message, getSelf());發送方引用與消息一起傳遞,并在處理此消息時通過getSender()方法在接收 Actor 中使用。在一個 Actor 內部,通常是getSelf()作為發送者,但在某些情況下,回復(replies)應該路由到另一個 Actor,例如,父對象,其中tell的第二個參數將是另一個 Actor。在 Actor 外部,如果不需要回復,則第二個參數可以為null;如果在 Actor 外部需要回復,則可以使用下面描述的ask模式。
Ask: Send-And-Receive-Future
ask模式涉及 Actor 和Future,因此它是作為一種使用模式而不是ActorRef上的一種方法提供的:
import static akka.pattern.Patterns.ask; import static akka.pattern.Patterns.pipe;import java.util.concurrent.CompletableFuture; final Duration t = Duration.ofSeconds(5);// using 1000ms timeout CompletableFuture<Object> future1 =ask(actorA, "request", Duration.ofMillis(1000)).toCompletableFuture();// using timeout from above CompletableFuture<Object> future2 = ask(actorB, "another request", t).toCompletableFuture();CompletableFuture<Result> transformed =CompletableFuture.allOf(future1, future2).thenApply(v -> {String x = (String) future1.join();String s = (String) future2.join();return new Result(x, s);});pipe(transformed, system.dispatcher()).to(actorC);這個例子演示了ask和pipeTo模式在Future上的結合,因為這可能是一個常見的組合。請注意,以上所有內容都是完全非阻塞和異步的:ask生成一個,其中兩個使用CompletableFuture.allOf和thenApply方法組合成新的Future,然后pipe在CompletionStage上安裝一個處理程序,以將聚合Result提交給另一個 Actor。
使用ask會像使用tell一樣向接收 Actor 發送消息,并且接收 Actor 必須使用getSender().tell(reply, getSelf())才能完成返回的值。ask操作涉及創建一個用于處理此回復的內部 Actor,該 Actor 需要有一個超時,在該超時之后才能將其銷毀,以便不泄漏資源;具體請參閱下面更多的內容。
- 警告:要完成帶異常的,你需要向發件人發送akka.actor.Status.Failure消息。當 Actor 在處理消息時拋出異常,不會自動執行此操作。
如果 Actor 未完成,則它將在超時期限(指定為ask方法的參數)之后過期;這將使用AskTimeoutException完成CompletionStage。
這可以用于注冊回調以便在完成時獲取通知,從而提供避免阻塞的方法。
- 警告:當使用Future的回調時,內部 Actor 需要小心避免關閉包含 Actor 的引用,即不要從回調中調用方法或訪問封閉 Actor 的可變狀態。這將破壞 Actor 的封裝,并可能引入同步錯誤和競態條件,因為回調將被同時調度到封閉 Actor。不幸的是,目前還沒有一種方法可以在編譯時檢測到這些非法訪問。另見「Actors 和共享可變狀態」。
轉發消息
你可以將消息從一個 Actor 轉發到另一個 Actor。這意味著即使消息通過“中介器(mediator)”,原始發送方地址/引用也會得到維護。這在編寫充當路由器(routers)、負載平衡器(load-balancers)、復制器(replicators)等的 Actor 時很有用。
target.forward(result, getContext());接收消息
Actor 必須通過在AbstractActor中實現createReceive方法來定義其初始接收行為:
@Override public Receive createReceive() {return receiveBuilder().match(String.class, s -> System.out.println(s.toLowerCase())).build(); }返回類型是AbstractActor.Receive,它定義了 Actor 可以處理哪些消息,以及如何處理這些消息的實現。可以使用名為ReceiveBuilder的生成器來構建此類行為。下面是一個例子:
import akka.actor.AbstractActor; import akka.event.Logging; import akka.event.LoggingAdapter;public class MyActor extends AbstractActor {private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);@Overridepublic Receive createReceive() {return receiveBuilder().match(String.class,s -> {log.info("Received String message: {}", s);}).matchAny(o -> log.info("received unknown message")).build();} }如果你希望提供許多match案例,但希望避免創建長調用跟蹤(a long call trail),可以將生成器的創建拆分為多個語句,如示例中所示:
import akka.actor.AbstractActor; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.japi.pf.ReceiveBuilder;public class GraduallyBuiltActor extends AbstractActor {private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);@Overridepublic Receive createReceive() {ReceiveBuilder builder = ReceiveBuilder.create();builder.match(String.class,s -> {log.info("Received String message: {}", s);});// do some other stuff in betweenbuilder.matchAny(o -> log.info("received unknown message"));return builder.build();} }在 Actor 中,使用小方法也是一種很好的做法。建議將消息處理的實際工作委托給方法,而不是在每個lambda中定義具有大量代碼的大型ReceiveBuilder。一個結構良好的 Actor 可以是這樣的:
static class WellStructuredActor extends AbstractActor {public static class Msg1 {}public static class Msg2 {}public static class Msg3 {}@Overridepublic Receive createReceive() {return receiveBuilder().match(Msg1.class, this::receiveMsg1).match(Msg2.class, this::receiveMsg2).match(Msg3.class, this::receiveMsg3).build();}private void receiveMsg1(Msg1 msg) {// actual work}private void receiveMsg2(Msg2 msg) {// actual work}private void receiveMsg3(Msg3 msg) {// actual work} }這樣做有以下好處:
- 更容易看到 Actor 能處理什么樣的信息
- 異常情況下的可讀堆棧跟蹤
- 更好地使用性能分析工具
- Java HotSpot 有更好的機會進行優化
Receive可以通過其他方式實現,而不是使用ReceiveBuilder,因為它最終只是一個 Scala PartialFunction的包裝器。在 Java 中,可以通過擴展AbstractPartialFunction實現PartialFunction。例如,可以實現「與 DSL 匹配的 Vavr 模式適配器」,有關更多詳細信息,請參閱「Akka Vavr 示例項目」。
如果對某些 Actor 來說,驗證ReceiveBuilder匹配邏輯是一個瓶頸,那么你可以考慮通過擴展UntypedAbstractActor而不是AbstractActor來在較低的級別實現它。ReceiveBuilder創建的分部函數由每個match語句的多個lambda表達式組成,其中每個lambda都引用要運行的代碼。這是 JVM 在優化時可能會遇到的問題,并且產生的代碼的性能可能不如非類型化版本。當擴展UntypedAbstractActor時,每個消息都作為非類型化Object接收,你必須以其他方式檢查并轉換為實際的消息類型,如下所示:
static class OptimizedActor extends UntypedAbstractActor {public static class Msg1 {}public static class Msg2 {}public static class Msg3 {}@Overridepublic void onReceive(Object msg) throws Exception {if (msg instanceof Msg1) receiveMsg1((Msg1) msg);else if (msg instanceof Msg2) receiveMsg2((Msg2) msg);else if (msg instanceof Msg3) receiveMsg3((Msg3) msg);else unhandled(msg);}private void receiveMsg1(Msg1 msg) {// actual work}private void receiveMsg2(Msg2 msg) {// actual work}private void receiveMsg3(Msg3 msg) {// actual work} }回復消息
如果你想有一個回復消息的句柄,可以使用getSender(),它會給你一個ActorRef。你可以通過使用getSender().tell(replyMsg, getSelf())發送ActorRef來進行回復。你還可以存儲ActorRef以供稍后回復或傳遞給其他 Actor。如果沒有發送者(發送的消息沒有 Actor 或Future上下文),那么發送者默認為死信(dead-letter)Actor 引用。
getSender().tell(s, getSelf());接收超時
ActorContext setReceiveTimeout定義了非活動超時,在該超時之后,將觸發發送ReceiveTimeout消息。指定超時時間后,接收函數應該能夠處理akka.actor.ReceiveTimeout消息。1毫秒是支持的最小超時時間。
請注意,接收超時(receive timeout)可能會在另一條消息排隊后立即觸發并排隊ReceiveTimeout消息;因此,不保證在接收超時,如通過此方法配置的那樣,事先必須有空閑時間。
設置后,接收超時將保持有效(即在非活動期后繼續重復觸發),可以通過傳入Duration.Undefined消息來關閉此功能。
static class ReceiveTimeoutActor extends AbstractActor {public ReceiveTimeoutActor() {// To set an initial delaygetContext().setReceiveTimeout(Duration.ofSeconds(10));}@Overridepublic Receive createReceive() {return receiveBuilder().matchEquals("Hello",s -> {// To set in a response to a messagegetContext().setReceiveTimeout(Duration.ofSeconds(1));}).match(ReceiveTimeout.class,r -> {// To turn it offgetContext().cancelReceiveTimeout();}).build();} }標記為NotInfluenceReceiveTimeout的消息將不會重置計時器。當ReceiveTimeout受外部不活動而不受內部活動(如定時勾選消息)影響時,這可能會很有用(This can be useful when ReceiveTimeout should be fired by external inactivity but not influenced by internal activity, e.g. scheduled tick messages.)。
定時器和調度消息
通過直接使用「Scheduler」,可以將消息安排在以后的時間點發送,但是在將 Actor 中的定期或單個消息安排到自身時,使用對命名定時器(named timers)的支持更為方便和安全。當 Actor 重新啟動并由定時器處理時,調度(scheduled)消息的生命周期可能難以管理。
import java.time.Duration; import akka.actor.AbstractActorWithTimers;static class MyActor extends AbstractActorWithTimers {private static Object TICK_KEY = "TickKey";private static final class FirstTick {}private static final class Tick {}public MyActor() {getTimers().startSingleTimer(TICK_KEY, new FirstTick(), Duration.ofMillis(500));}@Overridepublic Receive createReceive() {return receiveBuilder().match(FirstTick.class,message -> {// do something useful heregetTimers().startPeriodicTimer(TICK_KEY, new Tick(), Duration.ofSeconds(1));}).match(Tick.class,message -> {// do something useful here}).build();} }每個定時器都有一個鍵,可以更換或取消。它保證不會收到來自具有相同密鑰的定時器的前一個實例的消息,即使當它被取消或新定時器啟動時,它可能已經在郵箱中排隊。
定時器綁定到擁有它的 Actor 的生命周期,因此當它重新啟動或停止時自動取消。請注意,TimerScheduler不是線程安全的,即它只能在擁有它的 Actor 中使用。
停止 Actor
通過調用ActorRefFactory的stop方法(即ActorContext或ActorSystem)來停止 Actor。通常,上下文用于停止 Actor 本身或子 Actor,以及停止頂級 Actor 的系統。Actor 的實際終止是異步執行的,也就是說,stop可能會在 Actor 停止之前返回。
import akka.actor.ActorRef; import akka.actor.AbstractActor;public class MyStoppingActor extends AbstractActor {ActorRef child = null;// ... creation of child ...@Overridepublic Receive createReceive() {return receiveBuilder().matchEquals("interrupt-child", m -> getContext().stop(child)).matchEquals("done", m -> getContext().stop(getSelf())).build();} }當前郵件(如果有)的處理將在 Actor 停止之前繼續,但不會處理郵箱中的其他郵件。默認情況下,這些消息將發送到ActorSystem的deadLetters,但這取決于郵箱的實現。
一個 Actor 的終止分兩步進行:首先,Actor 暫停其郵箱處理并向其所有子級發送停止命令,然后繼續處理其子級的內部終止通知,直到最后一個終止,最后終止其自身(調用postStop、轉儲郵箱、在DeathWatch上發布Terminated、通知其監督者)。此過程確保 Actor 系統子樹以有序的方式終止,將stop命令傳播到葉,并將其確認信息收集回已停止的監督者。如果其中一個 Actor 沒有響應(即長時間處理消息,因此不接收stop命令),那么整個過程將被阻塞。
在ActorSystem.terminate()之后,系統守護者 Actor 將被停止,上述過程將確保整個系統的正確終止。
postStop()鉤子在 Actor 完全停止后調用。這樣可以清理資源:
@Override public void postStop() {final String message = "stopped";// don’t forget to think about who is the sender (2nd argument)target.tell(message, getSelf());final Object result = "";target.forward(result, getContext());target = null; }- 注釋:由于停止 Actor 是異步的,因此不能立即重用剛剛停止的子級的名稱;這將導致InvalidActorNameException。相反,watch()終止的 Actor,并創建其替換(replacement),以響應最終到達的Terminated消息。
PoisonPill
你還可以向 Actor 發送akka.actor.PoisonPill消息,該消息將在處理消息時停止 Actor。PoisonPill作為普通消息排隊,并將在郵箱中已排隊的消息之后處理。
victim.tell(akka.actor.PoisonPill.getInstance(), ActorRef.noSender());殺死一個 Actor
你也可以通過發送一條Kill消息來“殺死”一個 Actor。與PoisonPill不同的是,這可能會使 Actor 拋出ActorKilledException,并觸發失敗。Actor 將暫停操作,并詢問其監督者如何處理故障,這可能意味著恢復 Actor、重新啟動或完全終止 Actor。更多信息,請參閱「What Supervision Means」。
像這樣使用Kill:
victim.tell(akka.actor.Kill.getInstance(), ActorRef.noSender());// expecting the actor to indeed terminate: expectTerminated(Duration.ofSeconds(3), victim);一般來說,雖然在設計 Actor 交互時不建議過分依賴于PoisonPill或Kill,但通常情況下,鼓勵使用諸如PleaseCleanupAndStop之類的協議級(protocol-level)消息,因為 Actor 知道如何處理這些消息。像PoisonPill和Kill這樣的信息是為了能夠停止那些你無法控制的 Actor 的。
優雅的停止
如果你需要等待終止或組合多個 Actor 的有序終止,則gracefulStop非常有用:
import static akka.pattern.Patterns.gracefulStop; import akka.pattern.AskTimeoutException; import java.util.concurrent.CompletionStage;try {CompletionStage<Boolean> stopped =gracefulStop(actorRef, Duration.ofSeconds(5), Manager.SHUTDOWN);stopped.toCompletableFuture().get(6, TimeUnit.SECONDS);// the actor has been stopped } catch (AskTimeoutException e) {// the actor wasn't stopped within 5 seconds }當gracefulStop()成功返回時,Actor 的postStop()鉤子將被執行:在postStop()結尾和gracefulStop()返回之間存在一個“發生在邊緣之前(happens-before edge)”的關系。
在上面的例子中,一個定制的Manager.Shutdown消息被發送到目標 Actor,以啟動停止 Actor 的過程。你可以為此使用PoisonPill,但在停止目標 Actor 之前,你與其他 Actor 進行交互的可能性很有限??梢栽趐ostStop中處理簡單的清理任務。
- 警告:請記住,停止的 Actor 和取消注冊的 Actor 是彼此異步發生的獨立事件。因此,在gracefulStop()返回后,你可能會發現該名稱仍在使用中。為了保證正確的注銷,只能重用你控制的監督者中的名稱,并且只響應Terminated消息,即不用于頂級 Actor。
協調關閉
有一個名為CoordinatedShutdown的擴展,它將按特定順序停止某些 Actor 和服務,并在關閉過程中執行注冊的任務。
停機階段(shutdown phases)的順序在配置akka.coordinated-shutdown.phases中定義。默認階段定義為:
# CoordinatedShutdown is enabled by default and will run the tasks that # are added to these phases by individual Akka modules and user logic. # # The phases are ordered as a DAG by defining the dependencies between the phases # to make sure shutdown tasks are run in the right order. # # In general user tasks belong in the first few phases, but there may be use # cases where you would want to hook in new phases or register tasks later in # the DAG. # # Each phase is defined as a named config section with the # following optional properties: # - timeout=15s: Override the default-phase-timeout for this phase. # - recover=off: If the phase fails the shutdown is aborted # and depending phases will not be executed. # - enabled=off: Skip all tasks registered in this phase. DO NOT use # this to disable phases unless you are absolutely sure what the # consequences are. Many of the built in tasks depend on other tasks # having been executed in earlier phases and may break if those are disabled. # depends-on=[]: Run the phase after the given phases phases {# The first pre-defined phase that applications can add tasks to.# Note that more phases can be added in the application's# configuration by overriding this phase with an additional# depends-on.before-service-unbind {}# Stop accepting new incoming connections.# This is where you can register tasks that makes a server stop accepting new connections. Already# established connections should be allowed to continue and complete if possible.service-unbind {depends-on = [before-service-unbind]}# Wait for requests that are in progress to be completed.# This is where you register tasks that will wait for already established connections to complete, potentially# also first telling them that it is time to close down.service-requests-done {depends-on = [service-unbind]}# Final shutdown of service endpoints.# This is where you would add tasks that forcefully kill connections that are still around.service-stop {depends-on = [service-requests-done]}# Phase for custom application tasks that are to be run# after service shutdown and before cluster shutdown.before-cluster-shutdown {depends-on = [service-stop]}# Graceful shutdown of the Cluster Sharding regions.# This phase is not meant for users to add tasks to.cluster-sharding-shutdown-region {timeout = 10 sdepends-on = [before-cluster-shutdown]}# Emit the leave command for the node that is shutting down.# This phase is not meant for users to add tasks to.cluster-leave {depends-on = [cluster-sharding-shutdown-region]}# Shutdown cluster singletons# This is done as late as possible to allow the shard region shutdown triggered in# the "cluster-sharding-shutdown-region" phase to complete before the shard coordinator is shut down.# This phase is not meant for users to add tasks to.cluster-exiting {timeout = 10 sdepends-on = [cluster-leave]}# Wait until exiting has been completed# This phase is not meant for users to add tasks to.cluster-exiting-done {depends-on = [cluster-exiting]}# Shutdown the cluster extension# This phase is not meant for users to add tasks to.cluster-shutdown {depends-on = [cluster-exiting-done]}# Phase for custom application tasks that are to be run# after cluster shutdown and before ActorSystem termination.before-actor-system-terminate {depends-on = [cluster-shutdown]}# Last phase. See terminate-actor-system and exit-jvm above.# Don't add phases that depends on this phase because the# dispatcher and scheduler of the ActorSystem have been shutdown.# This phase is not meant for users to add tasks to.actor-system-terminate {timeout = 10 sdepends-on = [before-actor-system-terminate]} }如果需要,可以在應用程序的配置中添加更多的階段(phases),方法是使用附加的depends-on覆蓋階段。尤其是在before-service-unbind、before-cluster-shutdown和before-actor-system-terminate的階段,是針對特定于應用程序的階段或任務的。
默認階段是以單個線性順序定義的,但是通過定義階段之間的依賴關系,可以將階段排序為有向非循環圖(DAG)。階段是按 DAG 的拓撲排序的。
可以將任務添加到具有以下內容的階段:
CoordinatedShutdown.get(system).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind(),"someTaskName",() -> {return akka.pattern.Patterns.ask(someActor, "stop", Duration.ofSeconds(5)).thenApply(reply -> Done.getInstance());});在任務完成后,應返回CompletionStage<Done>。任務名稱參數僅用于調試或日志記錄。
添加到同一階段的任務是并行執行的,沒有任何排序假設。在完成前一階段的所有任務之前,下一階段將不會啟動。
如果任務沒有在配置的超時內完成(請參見「reference.conf」),則下一個階段無論如何都會啟動。如果任務失敗或未在超時內完成,則可以為一個階段配置recover=off以中止關閉過程的其余部分。
任務通常應在系統啟動后盡早注冊。運行時,將執行已注冊的協調關閉任務,但不會運行添加得太晚的任務。
要啟動協調關閉進程,可以對CoordinatedShutdown擴展調用runAll:
CompletionStage<Done> done =CoordinatedShutdown.get(system).runAll(CoordinatedShutdown.unknownReason());多次調用runAll方法是安全的,它只能運行一次。
這也意味著ActorSystem將在最后一個階段終止。默認情況下,不會強制停止 JVM(如果終止了所有非守護進程線程,則會停止 JVM)。要啟用硬System.exit作為最終操作,可以配置:
akka.coordinated-shutdown.exit-jvm = on當使用「Akka 集群」時,當集群節點將自己視為Exiting時,CoordinatedShutdown將自動運行,即從另一個節點離開將觸發離開節點上的關閉過程。當使用 Akka 集群時,會自動添加集群的優雅離開任務,包括集群單例的優雅關閉和集群分片,即運行關閉過程也會觸發尚未進行的優雅離開。
默認情況下,當 JVM 進程退出時,例如通過kill SIGTERM信號(SIGINT時Ctrl-C不起作用),將運行CoordinatedShutdown。此行為可以通過以下方式禁用:
akka.coordinated-shutdown.run-by-jvm-shutdown-hook=off如果你有特定于應用程序的 JVM 關閉鉤子,建議你通過CoordinatedShutdown對它們進行注冊,以便它們在 Akka 內部關閉鉤子之前運行,例如關閉 Akka 遠程處理。
CoordinatedShutdown.get(system).addJvmShutdownHook(() -> System.out.println("custom JVM shutdown hook..."));對于某些測試,可能不希望通過CoordinatedShutdown來終止ActorSystem。你可以通過將以下內容添加到測試時使用的ActorSystem的配置中來禁用此功能:
# Don't terminate ActorSystem via CoordinatedShutdown in tests akka.coordinated-shutdown.terminate-actor-system = off akka.coordinated-shutdown.run-by-jvm-shutdown-hook = off akka.cluster.run-coordinated-shutdown-when-down = offBecome/Unbecome
升級
Akka 支持在運行時對 Actor 的消息循環(例如其實現)進行熱交換:從 Actor 內部調用context.become方法。become采用實現新消息處理程序的PartialFunction<Object, BoxedUnit>。熱交換代碼(hotswapped code)保存在一個Stack中,可以推送和彈出。
- 警告:請注意,Actor 在被其監督者重新啟動時將恢復其原始行為。
要使用become熱交換 Actor 的行為,可以參考以下操作:
static class HotSwapActor extends AbstractActor {private AbstractActor.Receive angry;private AbstractActor.Receive happy;public HotSwapActor() {angry =receiveBuilder().matchEquals("foo",s -> {getSender().tell("I am already angry?", getSelf());}).matchEquals("bar",s -> {getContext().become(happy);}).build();happy =receiveBuilder().matchEquals("bar",s -> {getSender().tell("I am already happy :-)", getSelf());}).matchEquals("foo",s -> {getContext().become(angry);}).build();}@Overridepublic Receive createReceive() {return receiveBuilder().matchEquals("foo", s -> getContext().become(angry)).matchEquals("bar", s -> getContext().become(happy)).build();} }become方法的這種變體對于許多不同的事情都很有用,例如實現有限狀態機(FSM,例如「Dining Hakkers」)。它將替換當前行為(即行為堆棧的頂部),這意味著你不使用unbecome,而是始終顯式安裝(installed)下一個行為。
另一種使用become的方法不是替換而是添加到行為堆棧的頂部。在這種情況下,必須注意確保pop操作的數量(即unbecome)與push操作的數量在長期內匹配,否則這將導致內存泄漏,這就是為什么此行為不是默認行為。
static class Swapper extends AbstractLoggingActor {@Overridepublic Receive createReceive() {return receiveBuilder().matchEquals(Swap,s -> {log().info("Hi");getContext().become(receiveBuilder().matchEquals(Swap,x -> {log().info("Ho");getContext().unbecome(); // resets the latest 'become' (just for fun)}).build(),false); // push on top instead of replace}).build();} }static class SwapperApp {public static void main(String[] args) {ActorSystem system = ActorSystem.create("SwapperSystem");ActorRef swapper = system.actorOf(Props.create(Swapper.class), "swapper");swapper.tell(Swap, ActorRef.noSender()); // logs Hiswapper.tell(Swap, ActorRef.noSender()); // logs Hoswapper.tell(Swap, ActorRef.noSender()); // logs Hiswapper.tell(Swap, ActorRef.noSender()); // logs Hoswapper.tell(Swap, ActorRef.noSender()); // logs Hiswapper.tell(Swap, ActorRef.noSender()); // logs Hosystem.terminate();} }對 Scala Actor 嵌套接收進行編碼,而不會意外泄漏內存
請參閱「Unnested receive example」。
Stash
AbstractActorWithStash類使 Actor 能夠臨時存儲"不能"或"不應該"使用 Actor 當前行為處理的消息。更改 Actor 的消息處理程序后,即在調用getContext().become()或getContext().unbecome()之前,所有隱藏的消息都可以“unstashed”,從而將它們預存到 Actor 的郵箱中。這樣,可以按照與最初接收到的消息相同的順序處理隱藏的消息。擴展AbstractActorWithStash的 Actor 將自動獲得基于deque的郵箱。
- 注釋:抽象類AbstractActorWithStash實現了標記接口RequiresMessageQueue<DequeBasedMessageQueueSemantics>,如果希望對郵箱進行更多控制,請參閱有關郵箱的文檔:「Mailboxes」。
下面是AbstractActorWithStash類的一個示例:
static class ActorWithProtocol extends AbstractActorWithStash {@Overridepublic Receive createReceive() {return receiveBuilder().matchEquals("open",s -> {getContext().become(receiveBuilder().matchEquals("write",ws -> {/* do writing */}).matchEquals("close",cs -> {unstashAll();getContext().unbecome();}).matchAny(msg -> stash()).build(),false);}).matchAny(msg -> stash()).build();} }調用stash()會將當前消息(Actor 最后收到的消息)添加到 Actor 的stash中。它通常在處理 Actor 消息處理程序中的默認情況時調用,以存儲其他情況未處理的消息。將同一條消息存儲兩次是非法的;這樣做會導致IllegalStateException。stash也可以是有界的,在這種情況下,調用stash()可能導致容量沖突(capacity violation),從而導致StashOverflowException。可以使用郵箱配置的stash-capacity設置(一個int值)存儲容量。
調用unstashAll()將消息從stash排隊到 Actor 的郵箱,直到達到郵箱的容量(如果有),請注意,stash中的消息是預先發送到郵箱的。如果有界郵箱溢出,將引發MessageQueueAppendFailedException。調用unstashAll()后,stash保證為空。
stash由scala.collection.immutable.Vector支持。因此,即使是非常大量的消息也可能被存儲起來,而不會對性能產生重大影響。
請注意,與郵箱不同,stash是短暫的 Actor 狀態的一部分。因此,它應該像 Actor 狀態中具有相同屬性的其他部分一樣進行管理。preRestart的AbstractActorWithStash實現將調用unstashAll(),這通常是需要的行為。
- 注釋:如果你想強制你的 Actor 只能使用無界的stash,那么你應該使用AbstractActorWithUnboundedStash類。
Actor 和異常
當 Actor 處理消息時,可能會引發某種異常,例如數據庫異常。
消息發生了什么
如果在處理郵件時引發異常(即從郵箱中取出并移交給當前行為),則此郵件將丟失。重要的是要知道它不會放回郵箱。因此,如果你想重試處理消息,你需要自己處理它,捕獲異常并重試處理流程(retry your flow)。確保對重試次數進行了限制,因為你不希望系統進行livelock,否則的話,這會在程序沒有進展的情況下消耗大量 CPU 周期。
郵箱發生了什么
如果在處理郵件時引發異常,則郵箱不會發生任何異常。如果 Actor 重新啟動,則會出現相同的郵箱。因此,該郵箱上的所有郵件也將在那里。
Actor 發現了什么
如果 Actor 內的代碼拋出異常,則該 Actor 將被掛起,并且監控過程將啟動。根據監督者的決定,Actor 被恢復(好像什么都沒有發生)、重新啟動(清除其內部狀態并從頭開始)或終止。
初始化模式
Actor 的豐富生命周期鉤子提供了一個有用的工具箱來實現各種初始化模式(initialization patterns)。在ActorRef的生命周期中,Actor 可能會經歷多次重新啟動,舊實例被新實例替換,外部觀察者看不見內部的變化,外部觀察者只看到ActorRef引用。
每次實例化一個 Actor 時,可能都需要初始化,但有時只需要在創建ActorRef時對第一個實例進行初始化。以下部分提供了不同初始化需求的模式。
通過構造函數初始化
使用構造函數進行初始化有很多好處。首先,它讓使用val字段存儲在 Actor 實例的生命周期中不發生更改的任何狀態成為可能,從而使 Actor 的實現更加健壯。當創建一個調用actorOf的 Actor 實例時,也會在重新啟動時調用構造函數,因此 Actor 的內部始終可以假定發生了正確的初始化。這也是這種方法的缺點,因為在某些情況下,人們希望避免在重新啟動時重新初始化內部信息。例如,在重新啟動時保護子 Actor 通常很有用。下面的部分提供了這個案例的模式。
通過 preStart 初始化
在第一個實例的初始化過程中,即在創建ActorRef時,只直接調用一次 Actor 的preStart()方法。在重新啟動的情況下,postRestart()調用preStart(),因此如果不重寫,則在每次重新啟動時都會調用preStart()。但是,通過重寫postRestart(),可以禁用此行為,并確保只有一個對preStart()的調用。
此模式的一個有用用法是在重新啟動期間禁用為子級創建新的ActorRef。這可以通過重寫preRestart()來實現。以下是這些生命周期掛鉤的默認實現:
@Override public void preStart() {// Initialize children here }// Overriding postRestart to disable the call to preStart() // after restarts @Override public void postRestart(Throwable reason) {}// The default implementation of preRestart() stops all the children // of the actor. To opt-out from stopping the children, we // have to override preRestart() @Override public void preRestart(Throwable reason, Optional<Object> message) throws Exception {// Keep the call to postStop(), but no stopping of childrenpostStop(); }請注意,子 Actor 仍然重新啟動,但沒有創建新的ActorRef??梢赃f歸地為子級應用相同的原則,確保只在創建引用時調用它們的preStart()方法。
有關更多信息,請參閱「What Restarting Means」。
通過消息傳遞初始化
有些情況下,在構造函數中無法傳遞 Actor 初始化所需的所有信息,例如在存在循環依賴項的情況下。在這種情況下,Actor 應該監聽初始化消息,并使用become()或有限狀態機(finite state-machine)狀態轉換來編碼 Actor 的初始化和未初始化狀態。
@Override public Receive createReceive() {return receiveBuilder().matchEquals("init",m1 -> {initializeMe = "Up and running";getContext().become(receiveBuilder().matchEquals("U OK?",m2 -> {getSender().tell(initializeMe, getSelf());}).build());}).build(); }如果 Actor 可能在初始化消息之前收到消息,那么一個有用的工具可以是Stash存儲消息,直到初始化完成,然后在 Actor 初始化之后重放消息。
- 警告:此模式應小心使用,并且僅當上述模式均不適用時才應用。其中一個潛在的問題是,消息在發送到遠程 Actor 時可能會丟失。此外,在未初始化狀態下發布ActorRef可能會導致在初始化完成之前接收到用戶消息的情況。
英文原文鏈接:Actors.
———— ☆☆☆ —— 返回 -> Akka 中文指南 <- 目錄 —— ☆☆☆ ————
總結
以上是生活随笔為你收集整理的Akka 指南 之「Actors」的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 视频摘要发展综述
- 下一篇: Kafka 数据丢失问题