Akka 指南 之「集群分片」
溫馨提示:Akka 中文指南的 GitHub 地址為「akka-guide」,歡迎大家Star、Fork,糾錯。
文章目錄
- 集群分片
- 依賴
- 示例項目
- 簡介
- 一個示例
- 它是如何工作的?
- 場景
- 場景1:向屬于本地 ShardRegion 的未知分片發送消息
- 場景2:向屬于遠程 ShardRegion 的未知分片發送消息
- 分片位置
- 分片再平衡
- ShardCoordinator 狀態
- 消息排序
- 開銷
- 分布式數據模式 vs. 持久化模式
- 分布式數據模式
- 持久化模式
- 達到最少成員數后啟動
- 僅代理模式
- Passivation
- Automatic Passivation
- Remembering Entities
- 監督
- 優雅地關閉
- 刪除內部群集分片數據
- 配置
- 檢查群集分片狀態
- 滾動升級
集群分片
依賴
為了使用集群分片(Cluster Sharding),你必須在項目中添加如下依賴:
<!-- Maven --> <dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-cluster-sharding_2.11</artifactId><version>2.5.19</version> </dependency><!-- Gradle --> dependencies {compile group: 'com.typesafe.akka', name: 'akka-cluster-sharding_2.11', version: '2.5.19' }<!-- sbt --> libraryDependencies += "com.typesafe.akka" %% "akka-cluster-sharding" % "2.5.19"示例項目
你可以查看「集群分片」項目,以了解 Akka 集群分片的實際使用情況。
簡介
當你需要將 Actor 分布在集群中的多個節點上,并且希望能夠使用它們的邏輯標識符與它們進行交互,但不必關心它們在集群中的物理位置時,集群分片(Cluster sharding)非常有用,這也可能隨著時間的推移而改變。
例如,它可以是表示域驅動設計(Domain-Driven Design)術語中聚合根(Aggregate Roots)的 Actor。在這里,我們稱這些 Actor 為“實體”。這些 Actor 通常具有持久(durable)狀態,但此功能不限于具有持久狀態的 Actor。
集群切分通常在有許多狀態 Actor 共同消耗的資源(例如內存)多于一臺機器上所能容納的資源時使用。如果你只有幾個有狀態的 Actor,那么在集群單例(Cluster Singleton)節點上運行它們可能更容易。
在這個上下文中,分片意味著具有標識符(稱為實體)的 Actor 可以自動分布在集群中的多個節點上。每個實體 Actor 只在一個地方運行,消息可以發送到實體,而不需要發送者知道目標 Actor 的位置。這是通過這個擴展提供的ShardRegion Actor 發送消息來實現的,它知道如何將帶有實體 ID 的消息路由到最終目標。
如果啟用了該功能,則集群分片將不會在狀態為WeaklyUp的成員上活動。
- 警告:不要將 Cluster Sharding 與 Automatic Downing 一起使用,因為它允許集群分裂為兩個單獨的集群,從而導致多個分片和實體啟動,每個集群中只有一個節點!詳見「Downing」。
一個示例
這就是實體 Actor 的樣子:
public class Counter extends AbstractPersistentActor {public enum CounterOp {INCREMENT, DECREMENT}public static class Get {final public long counterId;public Get(long counterId) {this.counterId = counterId;}}public static class EntityEnvelope {final public long id;final public Object payload;public EntityEnvelope(long id, Object payload) {this.id = id;this.payload = payload;}}public static class CounterChanged {final public int delta;public CounterChanged(int delta) {this.delta = delta;}}int count = 0;// getSelf().path().name() is the entity identifier (utf-8 URL-encoded)@Overridepublic String persistenceId() {return "Counter-" + getSelf().path().name();}@Overridepublic void preStart() throws Exception {super.preStart();getContext().setReceiveTimeout(Duration.ofSeconds(120));}void updateState(CounterChanged event) {count += event.delta;}@Overridepublic Receive createReceiveRecover() {return receiveBuilder().match(CounterChanged.class, this::updateState).build();}@Overridepublic Receive createReceive() {return receiveBuilder().match(Get.class, this::receiveGet).matchEquals(CounterOp.INCREMENT, msg -> receiveIncrement()).matchEquals(CounterOp.DECREMENT, msg -> receiveDecrement()).matchEquals(ReceiveTimeout.getInstance(), msg -> passivate()).build();}private void receiveGet(Get msg) {getSender().tell(count, getSelf());}private void receiveIncrement() {persist(new CounterChanged(+1), this::updateState);}private void receiveDecrement() {persist(new CounterChanged(-1), this::updateState);}private void passivate() {getContext().getParent().tell(new ShardRegion.Passivate(PoisonPill.getInstance()), getSelf());} }上面的 Actor 使用事件源和AbstractPersistentActor中提供的支持來存儲其狀態。它不必是持久性 Actor,但是如果節點之間的實體發生故障或遷移,那么它必須能夠恢復其狀態(如果它是有價值的)。
請注意如何定義persistenceId。Actor 的名稱是實體標識符(UTF-8 URL 編碼)。你也可以用另一種方式定義它,但它必須是唯一的。
當使用分片擴展時,你首先要使用ClusterSharding.start方法注冊支持的實體類型,通常是在集群中每個節點上的系統啟動時。ClusterSharding.start為你提供了可以傳遞的參考。請注意,如果當前群集節點的角色與在ClusterShardingSettings中指定的角色不匹配,ClusterSharding.start將以代理模式啟動ShardRegion。
import akka.japi.Option; import akka.cluster.sharding.ClusterSharding; import akka.cluster.sharding.ClusterShardingSettings;Option<String> roleOption = Option.none(); ClusterShardingSettings settings = ClusterShardingSettings.create(system); ActorRef startedCounterRegion =ClusterSharding.get(system).start("Counter", Props.create(Counter.class), settings, messageExtractor);messageExtractor定義了特定于應用程序的方法,以從傳入消息中提取實體標識符和分片標識符。
import akka.cluster.sharding.ShardRegion;ShardRegion.MessageExtractor messageExtractor =new ShardRegion.MessageExtractor() {@Overridepublic String entityId(Object message) {if (message instanceof Counter.EntityEnvelope)return String.valueOf(((Counter.EntityEnvelope) message).id);else if (message instanceof Counter.Get)return String.valueOf(((Counter.Get) message).counterId);else return null;}@Overridepublic Object entityMessage(Object message) {if (message instanceof Counter.EntityEnvelope)return ((Counter.EntityEnvelope) message).payload;else return message;}@Overridepublic String shardId(Object message) {int numberOfShards = 100;if (message instanceof Counter.EntityEnvelope) {long id = ((Counter.EntityEnvelope) message).id;return String.valueOf(id % numberOfShards);} else if (message instanceof Counter.Get) {long id = ((Counter.Get) message).counterId;return String.valueOf(id % numberOfShards);} else {return null;}}};此示例說明了在消息中定義實體標識符的兩種不同方法:
- Get消息包含標識符本身。
- EntityEnvelope包含標識符,發送給實體 Actor 的實際消息包裝在信封中。
注意這兩種消息類型是如何在上面展示的entityId和entityMessage方法中處理的。發送給實體 Actor 的消息是entityMessage返回的,這使得在需要時可以打開信封(unwrap envelopes)。
分片是一起管理的一組實體。分組由上文所示的extractShardId函數定義。對于特定的實體標識符,分片標識符必須始終相同。否則,實體 Actor 可能會同時在多個位置意外啟動。
創建一個好的分片算法(sharding algorithm)本身就是一個有趣的挑戰。嘗試產生一個統一的分布,即在每個分片中有相同數量的實體。根據經驗,分片的數量應該比計劃的最大集群節點數量大十倍。分片少于節點數量將導致某些節點不會承載任何分片。太多的分片將導致對分片的管理效率降低,例如重新平衡開銷,并增加延遲,因為協調器(coordinator)參與每個分片的第一條消息的路由。正在運行的集群中的所有節點上的分片算法必須相同。它可以在停止群集中的所有節點后進行更改。
一個簡單的分片算法在大多數情況下都可以很好地工作,它是以分片的實體標識符模數的hashCode的絕對值為基礎的。為了方便起見,ShardRegion.HashCodeMessageExtractor提供了這一功能。
向實體發送的消息始終通過本地ShardRegion發送。命名實體類型的ShardRegion Actor 引用由ClusterSharding.start返回,也可以使用ClusterSharding.shardRegion檢索。如果ShardRegion不知道其位置的話,它將查找實體的分片位置。它將把消息委托給正確的節點,并根據需要創建實體 Actor,即在傳遞特定實體的第一條消息時。
ActorRef counterRegion = ClusterSharding.get(system).shardRegion("Counter"); counterRegion.tell(new Counter.Get(123), getSelf());counterRegion.tell(new Counter.EntityEnvelope(123, Counter.CounterOp.INCREMENT), getSelf()); counterRegion.tell(new Counter.Get(123), getSelf());它是如何工作的?
ShardRegion Actor 在集群中的每個節點或標記有特定角色的節點組上啟動。ShardRegion由兩個特定于應用程序的函數創建,用于從傳入消息中提取實體標識符(entity identifier)和分片標識符(shard identifier)。分片是統一管理的一組實體。對于特定分片中的第一條消息,ShardRegion將從中心協調者ShardCoordinator請求分片的位置。
ShardCoordinator決定哪個ShardRegion將擁有Shard,并通知ShardRegion。區域(region)將確認此請求并將Shard 監督者創建為子 Actor。然后,當Shard Actor 需要時,將創建各個Entities。因此,傳入消息通過ShardRegion和Shard傳輸到目標Entity。
如果shard home是另一個ShardRegion實例,則消息將轉發到該ShardRegion實例。當解析分片的位置時,該分片的傳入消息將被緩沖,并在分片所在地(home)已知時傳遞。到已解析分片的后續消息可以立即傳遞到目標目的地,而不涉及ShardCoordinator。
場景
一旦知道Shard的位置,ShardRegions就直接發送消息。下面是進入此狀態的場景。在場景中,使用以下符號:
- SC - ShardCoordinator
- M# - Message 1, 2, 3, 等
- SR# - ShardRegion 1, 2 3, 等
- S# - Shard 1 2 3, 等
- E# - Entity 1 2 3, 等,實體是指由集群分片管理的 Actor。
場景1:向屬于本地 ShardRegion 的未知分片發送消息
場景2:向屬于遠程 ShardRegion 的未知分片發送消息
分片位置
為了確保特定實體 Actor 的至多一個實例在集群中的某個地方運行,所有節點都具有相同的分片(shard)所在位置視圖是很重要的。因此,分片分配決策由中心ShardCoordinator執行,它作為一個集群單例運行,即在所有集群節點中的最老成員上或標記有特定角色的一組節點上執行一個實例。
決定分片位置的邏輯在可插拔分片分配策略中定義。默認實現ShardCoordinator.LeastShardAllocationStrategy將新的分片分配給ShardRegion,其中以前分配的分片數量最少。此策略可以由特定于應用程序的實現替代。
分片再平衡
為了能夠在集群中使用新添加的成員,協調器(coordinator)促進了分片的重新平衡(rebalancing of shards),即將實體從一個節點遷移到另一個節點。在重新平衡過程中,協調器首先通知所有ShardRegion Actor 已開始對分片的切換。這意味著它們將開始緩沖該分片的傳入消息,就像分片位置未知一樣。在重新平衡過程中,協調器不會回答任何有關正在重新平衡的分片位置的請求,即本地緩沖將繼續,直到完成切換。負責重新平衡分片的ShardRegion將通過向該分片中的所有實體發送指定的stopMessage(默認為PoisonPill)來停止該分片中的所有實體。所有實體終止后,擁有實體的ShardRegion將確認已向協調器完成移交。此后,協調器將回復分片位置的請求,從而為分片分配一個新的位置,然后將分片區域 Actor 中的緩沖消息發送到新位置。這意味著實體的狀態不會被轉移或遷移。如果實體的狀態很重要,那么它應該是持久的,例如「Persistence」,以便可以在新的位置恢復。
決定要重新平衡哪些分片的邏輯在可插入分片分配策略(a pluggable shard allocation strategy)中定義。默認實現ShardCoordinator.LeastShardAllocationStrategy從ShardRegion中選擇用于切換的分片,其中包含以前分配的大多數碎片。然后,它們將以最少數量的先前分配的分片(即集群中的新成員)分配給ShardRegion。
對于LeastShardAllocationStrategy,有一個可配置的閾值(rebalance-threshold),說明開始重新平衡時差異必須有多大。在分片最多的區域和分片最少的區域中,分片數量的差異必須大于發生重新平衡的rebalance-threshold。
當rebalance-threshold為1時,給出了最佳分布,因此通常是最佳選擇。更高的閾值意味著更多的分片可以同時重新平衡,而不是一個接一個。這樣做的優點是,重新平衡過程可以更快,但缺點是不同節點之間的分片數量(因此負載)可能會顯著不同。
ShardCoordinator 狀態
ShardCoordinator中分片位置的狀態是持久的,帶有「Distributed Data」或「Persistence」,可以在故障中幸存。當從集群中刪除崩潰或無法訪問的協調節點(通過down)時,新的ShardCoordinator單例 Actor 將接管并恢復狀態。在這種故障期間,具有已知位置的分片仍然可用,而新(未知)分片的消息將被緩沖,直到新的ShardCoordinator可用。
消息排序
只要發送者使用同一個ShardRegion Actor 將消息傳遞給實體 Actor,消息的順序就會保持不變。只要沒有達到緩沖區限制,消息就會以“最多一次傳遞”的語義盡最大努力傳遞,與普通消息發送的方式相同。可靠的端到端(end-to-end)消息傳遞,通過在「Persistence」中使用AtLeastOnceDelivery,可以實現“至少一次傳遞”的語義。
開銷
由于到協調器的往返(round-trip),針對新的或以前未使用的分片的消息引入了一些額外的延遲。重新平衡分片也可能增加延遲。在設計特定于應用程序的分片解決方案時,應該考慮這一點,例如,為了避免太細的分片。一旦知道分片的位置,唯一的開銷(overhead)就是通過ShardRegion發送消息,而不是直接發送消息。
分布式數據模式 vs. 持久化模式
協調器的狀態和分片「Remembering Entities」的狀態是持久的,可以在失敗中幸存。「Distributed Data」或「Persistence」可用于存儲。默認情況下使用分布式數據(Distributed Data)。
使用兩種模式時的功能相同。如果你的分片實體本身不使用 Akka 持久化(Persistence),那么使用分布式數據模式更方便,因為你不必為持久性設置和操作單獨的數據存儲(如 Cassandra)。除此之外,使用一種模式而不使用另一種模式沒有主要原因。
在集群中的所有節點上使用相同的模式很重要,即不可能執行滾動升級來更改此設置。
分布式數據模式
此模式通過配置啟用(默認情況下啟用):
akka.cluster.sharding.state-store-mode = ddataShardCoordinator的狀態將在集群內由分布式數據模塊復制,具有WriteMajority/ReadMajority一致性。協調器的狀態不持久,它沒有存儲到磁盤。當集群中的所有節點都已停止時,狀態將丟失,也不再需要了。
記憶實體(Remembering Entities)的狀態也是持久的,即存儲在磁盤上。存儲的實體也會在群集完全重新啟動后啟動。
集群分片(Cluster Sharding)使用它自己的每個節點角色的分布式數據Replicator。通過這種方式,可以將所有節點的子集用于某些實體類型,將另一個子集用于其他實體類型。每個這樣的復制器(replicator)都有一個包含節點角色的名稱,因此集群中所有節點上的角色配置都必須相同,即在執行滾動升級時不能更改角色。
分布式數據的設置在akka.cluster.sharding.distributed-data部分中配置。對于不同的分片實體類型,不可能有不同的distributed-data設置。
持久化模式
此模式通過配置啟用:
akka.cluster.sharding.state-store-mode = persistence因為它是在集群中運行的,所以必須用分布式日志配置持久化。
達到最少成員數后啟動
在集群設置akka.cluster.min-nr-of-members或akka.cluster.role.<role-name>.min-nr-of-members時,使用集群分片是很好的。這將推遲分片的分配,直到至少有配置數量的區域已經啟動并注冊到協調器。這就避免了許多分片被分配到第一個注冊的區域,只有在以后才被重新平衡到其他節點。
有關min-nr-of-members的詳細信息,請參閱「How To Startup when Cluster Size Reached」。
僅代理模式
ShardRegion Actor 也可以在僅代理模式(proxy only mode)下啟動,即它不會承載任何實體本身,但知道如何將消息委托到正確的位置。ShardRegion以僅代理模式使用ClusterSharding.startProxy方法啟動。此外,如果當前群集節點的角色與傳遞給ClusterSharding.start方法的ClusterShardingSettings中指定的角色不匹配時,則ShardRegion將以僅代理模式啟動。
Passivation
如果實體的狀態是持久的,則可以停止不用于減少內存消耗的實體。這是由實體 Actor 的特定于應用程序的實現完成的,例如通過定義接收超時(context.setReceiveTimeout)。如果某個消息在停止時已排隊到該實體,則將刪除郵箱中排隊的消息。為了在不丟失此類消息的情況下支持優雅的鈍化(passivation),實體 Actor 可以將ShardRegion.Passivate發送給其父Shard。在Passivate中指定的包裝消息將被發送回實體,然后該實體將自行停止。在接收到Passivate和終止實體之間,傳入消息將被Shard緩沖。這樣的緩沖消息隨后被傳遞到實體的新化身。
Automatic Passivation
如果實體使用akka.cluster.sharding.passivate-idle-entity-after設置一段時間沒有收到消息,或者通過將ClusterShardingSettings.passivateIdleEntityAfter顯式設置為一個合適的時間以保持 Actor 活動,則可以將這些實體配置為自動鈍化(automatically passivated)。請注意,只有通過分片發送的消息才會被計算在內,因此直接發送到 Actor 的ActorRef的消息或它發送給自身的消息不會被計算為活動。默認情況下,自動鈍化是禁止的。
Remembering Entities
通過在調用ClusterSharding.start時將ClusterShardingSettings中的rememberEntities標志設置為true,并確保shardIdExtractor處理Shard.StartEntity(EntityId),可以使每個Shard中的實體列表持久化,這意味著ShardId必須可以從EntityId中提取。
@Override public String shardId(Object message) {int numberOfShards = 100;if (message instanceof Counter.EntityEnvelope) {long id = ((Counter.EntityEnvelope) message).id;return String.valueOf(id % numberOfShards);} else if (message instanceof Counter.Get) {long id = ((Counter.Get) message).counterId;return String.valueOf(id % numberOfShards);} else if (message instanceof ShardRegion.StartEntity) {long id = Long.valueOf(((ShardRegion.StartEntity) message).entityId());return String.valueOf(id % numberOfShards);} else {return null;} }當配置為記憶實體(remember entities)時,每當Shard重新平衡到另一個節點上或在崩潰后恢復時,它將重新創建以前在該分片中運行的所有實體。要永久停止實體,必須向實體 Actor 的父級發送一條Passivate消息,否則在配置中指定的實體重新啟動回退之后,該實體將自動重新啟動。
當使用分布式數據模式時,實體的標識符存儲在分布式數據的「Durable Storage」中。你可能需要更改akka.cluster.sharding.distributed-data.durable.lmdb.dir的配置,因為默認目錄包含 Actor 系統的遠程端口。如果使用動態分配的端口(0),則每次都會不同,并且不會加載以前存儲的數據。
當rememberEntities設置為false時,Shard不會在重新平衡或從崩潰中恢復后自動重新啟動任何實體。只有在Shard中收到實體的第一條消息后,才會啟動實體。如果實體停止而不使用Passivate,則不會重新啟動。
請注意,實體本身的狀態將不會被恢復,除非它們已被持久化,例如「Persistence」。
當啟動/停止實體以及重新平衡分片時,rememberEntities的性能成本相當高。這種成本隨著每個分片的實體數量增加而增加,我們目前不建議在每個分片上使用超過 10000 個實體。
監督
如果需要為實體 Actor 使用其他supervisorStrategy,而不是默認(重新啟動)策略,則需要創建一個中間父 Actor,該 Actor 定義子實體 Actor 的supervisorStrategy。
static class CounterSupervisor extends AbstractActor {private final ActorRef counter =getContext().actorOf(Props.create(Counter.class), "theCounter");private static final SupervisorStrategy strategy =new OneForOneStrategy(DeciderBuilder.match(IllegalArgumentException.class, e -> SupervisorStrategy.resume()).match(ActorInitializationException.class, e -> SupervisorStrategy.stop()).match(Exception.class, e -> SupervisorStrategy.restart()).matchAny(o -> SupervisorStrategy.escalate()).build());@Overridepublic SupervisorStrategy supervisorStrategy() {return strategy;}@Overridepublic Receive createReceive() {return receiveBuilder().match(Object.class, msg -> counter.forward(msg, getContext())).build();} }你以同樣的方式啟動這樣一個監督者(supervisor),就像它是實體 Actor 一樣。
ClusterSharding.get(system).start("SupervisedCounter", Props.create(CounterSupervisor.class), settings, messageExtractor);請注意,當新消息針對(targeted to)實體時,停止的實體將再次啟動。
優雅地關閉
你可以將ShardRegion.gracefulShutdownInstance消息發送給ShardRegion Actor,以分發由該ShardRegion承載的所有分片,然后將停止ShardRegion Actor。你可以監控(watch)ShardRegion Actor 以便知道什么時候完成。在此期間,其他區域將以協調器觸發重新平衡時的相同方式緩沖這些分片的消息。當分片被停止時,協調器將把這些分片分配到其他地方。
這是由「Coordinated Shutdown」自動執行的,因此是集群成員正常退出進程的一部分。
刪除內部群集分片數據
集群分片協調器使用 Akka 持久化存儲分片的位置。重新啟動整個 Akka 集群時,可以安全地刪除這些數據。請注意,這不是應用程序數據。
有一個實用程序akka.cluster.sharding.RemoveInternalClusterShardingData,用于刪除此數據。
- 警告:在運行使用群集分片的 Akka 群集節點時,切勿使用此程序。使用此程序前,請停止所有群集節點。
如果由于數據損壞而無法啟動群集分片協調器,則可能需要刪除數據,如果同時意外運行兩個群集,例如由于使用自動關閉而存在網絡分裂,則可能會發生這種情況。
- 警告:不要將集群分片(Cluster Sharding)與自動關閉(Automatic Downing)一起使用,因為它允許集群分裂為兩個單獨的集群,從而導致多個分片和實體啟動。
使用這個程序作為一個獨立的 Java 主程序:
java -classpath <jar files, including akka-cluster-sharding>akka.cluster.sharding.RemoveInternalClusterShardingData-2.3 entityType1 entityType2 entityType3該程序包含在akka-cluster-sharding.jar 文件中。使用與普通應用程序相同的類路徑和配置運行它是最簡單的。它可以以類似的方式從 sbt 或 Maven 運行。
指定實體類型名稱(與在ClusterSharding的start方法中使用的名稱相同)作為程序參數。
如果將-2.3指定為第一個程序參數,它還將嘗試使用不同的persistenceId刪除在Akka 2.3.x中由集群分片(Cluster Sharding)存儲的數據。
配置
可以使用以下屬性配置ClusterSharding擴展。當使用ActorSystem參數創建時,ClusterShardingSettings將讀取這些配置屬性。還可以修改ClusterShardingSettings或從另一個配置部分創建它,布局如下。ClusterShardingSettings是ClusterSharding擴展的start方法的參數,也就是說,如果需要,每個實體類型都可以配置不同的設置。
# Settings for the ClusterShardingExtension akka.cluster.sharding {# The extension creates a top level actor with this name in top level system scope,# e.g. '/system/sharding'guardian-name = sharding# Specifies that entities runs on cluster nodes with a specific role.# If the role is not specified (or empty) all nodes in the cluster are used.role = ""# When this is set to 'on' the active entity actors will automatically be restarted# upon Shard restart. i.e. if the Shard is started on a different ShardRegion# due to rebalance or crash.remember-entities = off# Set this to a time duration to have sharding passivate entities when they have not# gotten any message in this long time. Set to 'off' to disable.passivate-idle-entity-after = off# If the coordinator can't store state changes it will be stopped# and started again after this duration, with an exponential back-off# of up to 5 times this duration.coordinator-failure-backoff = 5 s# The ShardRegion retries registration and shard location requests to the# ShardCoordinator with this interval if it does not reply.retry-interval = 2 s# Maximum number of messages that are buffered by a ShardRegion actor.buffer-size = 100000# Timeout of the shard rebalancing process.# Additionally, if an entity doesn't handle the stopMessage# after (handoff-timeout - 5.seconds).max(1.second) it will be stopped forcefullyhandoff-timeout = 60 s# Time given to a region to acknowledge it's hosting a shard.shard-start-timeout = 10 s# If the shard is remembering entities and can't store state changes# will be stopped and then started again after this duration. Any messages# sent to an affected entity may be lost in this process.shard-failure-backoff = 10 s# If the shard is remembering entities and an entity stops itself without# using passivate. The entity will be restarted after this duration or when# the next message for it is received, which ever occurs first.entity-restart-backoff = 10 s# Rebalance check is performed periodically with this interval.rebalance-interval = 10 s# Absolute path to the journal plugin configuration entity that is to be# used for the internal persistence of ClusterSharding. If not defined# the default journal plugin is used. Note that this is not related to# persistence used by the entity actors.# Only used when state-store-mode=persistencejournal-plugin-id = ""# Absolute path to the snapshot plugin configuration entity that is to be# used for the internal persistence of ClusterSharding. If not defined# the default snapshot plugin is used. Note that this is not related to# persistence used by the entity actors.# Only used when state-store-mode=persistencesnapshot-plugin-id = ""# Defines how the coordinator stores its state. Same is also used by the# shards for rememberEntities.# Valid values are "ddata" or "persistence". state-store-mode = "ddata"# The shard saves persistent snapshots after this number of persistent# events. Snapshots are used to reduce recovery times.# Only used when state-store-mode=persistencesnapshot-after = 1000# The shard deletes persistent events (messages and snapshots) after doing snapshot# keeping this number of old persistent batches.# Batch is of size `snapshot-after`.# When set to 0 after snapshot is successfully done all messages with equal or lower sequence number will be deleted.# Default value of 2 leaves last maximum 2*`snapshot-after` messages and 3 snapshots (2 old ones + fresh snapshot)keep-nr-of-batches = 2# Setting for the default shard allocation strategyleast-shard-allocation-strategy {# Threshold of how large the difference between most and least number of# allocated shards must be to begin the rebalancing.# The difference between number of shards in the region with most shards and# the region with least shards must be greater than (>) the `rebalanceThreshold`# for the rebalance to occur.# 1 gives the best distribution and therefore typically the best choice.# Increasing the threshold can result in quicker rebalance but has the# drawback of increased difference between number of shards (and therefore load)# on different nodes before rebalance will occur.rebalance-threshold = 1# The number of ongoing rebalancing processes is limited to this number.max-simultaneous-rebalance = 3}# Timeout of waiting the initial distributed state (an initial state will be queried again if the timeout happened)# Only used when state-store-mode=ddatawaiting-for-state-timeout = 5 s# Timeout of waiting for update the distributed state (update will be retried if the timeout happened)# Only used when state-store-mode=ddataupdating-state-timeout = 5 s# The shard uses this strategy to determines how to recover the underlying entity actors. The strategy is only used# by the persistent shard when rebalancing or restarting. The value can either be "all" or "constant". The "all"# strategy start all the underlying entity actors at the same time. The constant strategy will start the underlying# entity actors at a fix rate. The default strategy "all".entity-recovery-strategy = "all"# Default settings for the constant rate entity recovery strategyentity-recovery-constant-rate-strategy {# Sets the frequency at which a batch of entity actors is started.frequency = 100 ms# Sets the number of entity actors to be restart at a particular intervalnumber-of-entities = 5}# Settings for the coordinator singleton. Same layout as akka.cluster.singleton.# The "role" of the singleton configuration is not used. The singleton role will# be the same as "akka.cluster.sharding.role".coordinator-singleton = ${akka.cluster.singleton}# Settings for the Distributed Data replicator. # Same layout as akka.cluster.distributed-data.# The "role" of the distributed-data configuration is not used. The distributed-data# role will be the same as "akka.cluster.sharding.role".# Note that there is one Replicator per role and it's not possible# to have different distributed-data settings for different sharding entity types.# Only used when state-store-mode=ddatadistributed-data = ${akka.cluster.distributed-data}distributed-data {# minCap parameter to MajorityWrite and MajorityRead consistency level.majority-min-cap = 5durable.keys = ["shard-*"]# When using many entities with "remember entities" the Gossip message# can become to large if including to many in same message. Limit to# the same number as the number of ORSet per shard.max-delta-elements = 5}# The id of the dispatcher to use for ClusterSharding actors.# If not specified default dispatcher is used.# If specified you need to define the settings of the actual dispatcher.# This dispatcher for the entity actors is defined by the user provided# Props, i.e. this dispatcher is not used for the entity actors.use-dispatcher = "" }自定義分片分配策略(shard allocation strategy)可以在ClusterSharding.start的可選參數中定義。有關如何實現自定義分片分配策略的詳細信息,請參閱AbstractShardAllocationStrategy的 API 文檔。
檢查群集分片狀態
有兩個檢查群集狀態的請求可用:
- ShardRegion.getShardRegionStateInstance,它將返回一個ShardRegion.ShardRegionState,其中包含區域中運行的分片的標識符以及每個分片的活動實體。
- ShardRegion.GetClusterShardingStats,它將查詢集群中的所有區域,并返回一個ShardRegion.ClusterShardingStats,其中包含每個區域中運行的分片的標識符以及每個分片中活動的實體數。
可以通過ClusterSharding.getShardTypeNames獲取所有已啟動分片的類型名。
這些消息的目的是測試(testing)和監控(monitoring),它們不提供直接向各個實體發送消息的訪問權。
滾動升級
在進行滾動升級(rolling upgrades)時,必須特別注意不要改變以下任何分片方面:
- extractShardId函數
- 分片區域運行的角色
- 持久化模式
如果其中任何一個需要更改,則需要完全重新啟動群集。
英文原文鏈接:Cluster Sharding.
———— ☆☆☆ —— 返回 -> Akka 中文指南 <- 目錄 —— ☆☆☆ ————
總結
以上是生活随笔為你收集整理的Akka 指南 之「集群分片」的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 近场通信技术
- 下一篇: mysql5.6 centos_cent