Akka型演员:探索接收器模式
在上一篇文章中,我們研究了Akka Typed提供的一些基本功能。 在本文和下一篇文章中,我們將更進一步地了解一些其他功能,并通過查看Akka Typed提供的兩種不同模式來做到這一點:Receiver和Receptionist模式。 如果您是Akka Typed的新手,那么最好先閱讀上一篇文章,因為這將使您對Akka Typed有所了解。 因此,對于本系列中的Akka型文章,我們將研究Receiver模式。
- 與往常一樣,您可以在Github Gist中找到此示例的代碼: https : //gist.github.com/josdirksen/77e59d236c637d46ab32
接收方模式
在Akka Typed發行版中,有一個名為akka.typed.patterns的包。 在此程序包中,有兩種不同的模式,即接收方模式和接收方模式。 坦白說,為什么這兩種模式足夠重要以增加發行版,但我確實不知道,但是它們確實為在Akka Typed之后引入更多概念和想法提供了一個很好的方法。
因此,讓我們看一下Receiver模式,在下一篇文章中我們將做Receptionist模式。 要了解Receiver模式的功能,只需看一下我們可以發送給它的消息:
/*** Retrieve one message from the Receiver, waiting at most for the given duration.*/final case class GetOne[T](timeout: FiniteDuration)(val replyTo: ActorRef[GetOneResult[T]]) extends Command[T]/*** Retrieve all messages from the Receiver that it has queued after the given* duration has elapsed.*/final case class GetAll[T](timeout: FiniteDuration)(val replyTo: ActorRef[GetAllResult[T]]) extends Command[T]/*** Retrieve the external address of this Receiver (i.e. the side at which it* takes in the messages of type T.*/final case class ExternalAddress[T](replyTo: ActorRef[ActorRef[T]]) extends Command[T]從這些消息中可以看到,Receiver的工作是將T類型的消息排隊,并提供其他命令以在等待特定時間的同時獲取這些消息中的一個或多個。 要使用接收器,我們需要獲取ExternalAddress,以便我們可以向其發送類型為T的消息。 并且可以從其他參與者發送get GetOne和GetAll消息,以查看接收器中是否有任何消息在等待。
對于我們的示例,我們將創建以下參與者:
- 生產者,它向接收者發送類型為T的消息。
- 可以從此接收器檢索類型T消息的使用者。
- 根角色,運行此方案。
我們將從生產者開始,如下所示:
/*** Producer object containing the protocol and the behavior. This is a very simple* actor that produces messages using a schedule. To start producing messages* we need to send an initial message*/object Producer {// a simple protocol defining the messages that can be sentsealed trait ProducerMsgfinal case class registerReceiverMsgIn(msgIn: ActorRef[HelloMsg]) extends ProducerMsgfinal case class addHelloWorldMsg(msg: HelloMsg) extends ProducerMsg// the producer, which first waits for a registerReceiver message, after which// it changes behavior, to send messages.val producer = Full[ProducerMsg] {// if we receive a register message, we know where to send messages tocase Msg(ctx, registerReceiverMsgIn(msgConsumer)) =>println("Producer: Switching behavior")// simple helper function which sends a message to self.def scheduleMessage() = ctx.schedule(500 millisecond, ctx.self, addHelloWorldMsg(Hello(s"hello @ ${System.currentTimeMillis()}")))// schedule the first one, the rest will be triggered through the behavior.scheduleMessage()Static {// add a message to the receiver and schedule a new onecase addHelloWorldMsg(msg) => {println(s"Producer: Adding new '$msg' to receiver: $msgConsumer") ;msgConsumer ! msg; scheduleMessage()}}// don't switch behavior on any of the other messagescase _ => Same}}在此對象中,我們定義了可以發送給角色的消息以及行為。 registerReceiverMsgIn消息為操作者提供了應該向其發送消息的目的地(稍后將對此進行詳細介紹),并且addHelloWorldMsg告訴該行為將什么消息發送到registerReceiverMsgIn消息提供的地址。 如果您查看此行為,則可以看到我們使用Full [T]行為。 對于這種行為,我們必須為所有消息和信號提供匹配器,此外,我們還可以訪問actor ctx。 在其初始狀態下,此行為僅響應registerReceiverMsgIn消息。 當它收到這樣的消息時,它會做兩件事:
因此,當我們發送初??始的registerReceiverMessage時,它將導致actor每500 ms向接收者發送一條新消息。 現在讓我們看看另一面:消費者。
對于消費者,我們還將所有內容包裝在一個對象中,如下所示:
object Consumer {val consumer = Total[HelloMsg] {// in the case of a registerReceiver message, we change the implementation// since we're ready to receive other message.case registerReceiverCmdIn(commandAddress) => {println("Consumer: Switching behavior")// return a static implementation which closes over actorRefs// all messages we receive we pass to the receiver, which will queue// them. We have a specific message that prints out the received messagesContextAware { ctx =>Static[HelloMsg] {// printmessages just prints out the list of messages we've receivedcase PrintMessages(msgs) => println(s"Consumer: Printing messages: $msgs") ;msgs.foreach { hw => println(s" $hw")}// if we get the getAllMessages request, we get all the messages from// the receiver.case GetAllMessages() => {println("Consumer: requesting all messages")val wrap = ctx.spawnAdapter[GetAllResult[HelloMsg]] {case msgs:GetAllResult[HelloMsg] => println(s"Consumer: Received ${msgs.msgs.length} messages"); PrintMessages(msgs.msgs)}commandAddress ! GetAll(2 seconds)(wrap)}}}}// for all the other cases return the existing implementation, in essence// we're just ignoring other messages till we change statecase _ => Same} }在此對象中,我們定義了一個行為,該行為在接收到第一條消息后也會切換其實現。 在這種情況下,第一條消息稱為registerReceiverCmdIn。 通過此消息,我們可以訪問(接收方的)actorRef,將GetAll和getOne消息發送至該消息。 切換行為后,我們將處理自己的自定義GetAllMessages消息,該消息將觸發將GetAll消息發送到接收器。 由于未針對從Receiver收到的響應類型鍵入我們自己的行為,因此我們使用適配器(ctx.spawnAdapter)。 該適配器將接收來自接收器的響應并打印出消息。
消息的最后一部分是一個演員,它會啟動此行為:
// Simple root actor, which we'll use to start the other actorsval scenario1 = {Full[Unit] {case Sig(ctx, PreStart) => {import Producer._import Consumer._println("Scenario1: Started, now lets start up a number of child actors to do our stuff")// first start the two actors, one implements the receiver pattern, and// the other is the one we control directly.val receiverActor = ctx.spawn(Props(Receiver.behavior[HelloMsg]), "receiver")val consumerActor = ctx.spawn(Props(consumer), "adder")val producerActor = ctx.spawn(Props(producer), "producer")// our producerActor first needs the actorRef it can use to add messages to the receiver// for this we use a wrapper, this wrapper creates a child, which we use to get the// address, to which we can send messages.val wrapper = ctx.spawnAdapter[ActorRef[HelloMsg]] {case p: ActorRef[HelloMsg] => producerActor ! registerReceiverMsgIn(p)}// now send the message to get the external address, the response will be sent// to our own actor as a registerReceiver message, through the adapterreceiverActor ! ExternalAddress(wrapper)// our printing actor needs to now the address of the receiver so send it to himconsumerActor ! registerReceiverCmdIn(receiverActor)// by calling getAllMessages we get the messages within a time period.println("Scenario1: Get all the messages")consumerActor ! GetAllMessages()Thread.sleep(3000)consumerActor ! GetAllMessages()Thread.sleep(5000)consumerActor ! GetAllMessages()Same}}}這里沒什么特別的。 在這種情況下,我們將創建各種角色,并使用ctx.spawnAdapter來獲取接收者的外部地址,并將其傳遞給producerActor。 接下來,我們將接收者參與者的地址傳遞給消費者。 現在,我們在使用者地址上調用GetAllMessages,該地址將從接收方獲取消息并打印出來。
因此,總結一下將在此示例中執行的步驟:
這種情況的結果如下所示:
Scenario1: Started, now lets start up a number of child actors to do our stuff Scenario1: Get all the messages Consumer: Switching behavior Consumer: requesting all messages Producer: Switching behavior Producer: Adding new 'Hello(hello @ 1446277162929)' to receiver: Actor[akka://Root/user/receiver#1097367365] Producer: Adding new 'Hello(hello @ 1446277163454)' to receiver: Actor[akka://Root/user/receiver#1097367365] Producer: Adding new 'Hello(hello @ 1446277163969)' to receiver: Actor[akka://Root/user/receiver#1097367365] Consumer: Received 3 messages Consumer: Printing messages: Vector(Hello(hello @ 1446277162929), Hello(hello @ 1446277163454), Hello(hello @ 1446277163969))Hello(hello @ 1446277162929)Hello(hello @ 1446277163454)Hello(hello @ 1446277163969) Producer: Adding new 'Hello(hello @ 1446277164488)' to receiver: Actor[akka://Root/user/receiver#1097367365] Producer: Adding new 'Hello(hello @ 1446277165008)' to receiver: Actor[akka://Root/user/receiver#1097367365] Consumer: requesting all messages Producer: Adding new 'Hello(hello @ 1446277165529)' to receiver: Actor[akka://Root/user/receiver#1097367365] Producer: Adding new 'Hello(hello @ 1446277166049)' to receiver: Actor[akka://Root/user/receiver#1097367365] Producer: Adding new 'Hello(hello @ 1446277166569)' to receiver: Actor[akka://Root/user/receiver#1097367365] Producer: Adding new 'Hello(hello @ 1446277167089)' to receiver: Actor[akka://Root/user/receiver#1097367365] Consumer: Received 6 messages Consumer: Printing messages: Vector(Hello(hello @ 1446277164488), Hello(hello @ 1446277165008), Hello(hello @ 1446277165529), Hello(hello @ 1446277166049), Hello(hello @ 1446277166569), Hello(hello @ 1446277167089))Hello(hello @ 1446277164488)Hello(hello @ 1446277165008)Hello(hello @ 1446277165529)Hello(hello @ 1446277166049)Hello(hello @ 1446277166569)Hello(hello @ 1446277167089) Producer: Adding new 'Hello(hello @ 1446277167607)' to receiver: Actor[akka://Root/user/receiver#1097367365] Producer: Adding new 'Hello(hello @ 1446277168129)' to receiver: Actor[akka://Root/user/receiver#1097367365] Producer: Adding new 'Hello(hello @ 1446277168650)' to receiver: Actor[akka://Root/user/receiver#1097367365] Producer: Adding new 'Hello(hello @ 1446277169169)' to receiver: Actor[akka://Root/user/receiver#1097367365] Producer: Adding new 'Hello(hello @ 1446277169690)' to receiver: Actor[akka://Root/user/receiver#1097367365] Producer: Adding new 'Hello(hello @ 1446277170210)' to receiver: Actor[akka://Root/user/receiver#1097367365] Consumer: requesting all messages Producer: Adding new 'Hello(hello @ 1446277170729)' to receiver: Actor[akka://Root/user/receiver#1097367365] Producer: Adding new 'Hello(hello @ 1446277171249)' to receiver: Actor[akka://Root/user/receiver#1097367365] Producer: Adding new 'Hello(hello @ 1446277171769)' to receiver: Actor[akka://Root/user/receiver#1097367365] Producer: Adding new 'Hello(hello @ 1446277172289)' to receiver: Actor[akka://Root/user/receiver#1097367365] Consumer: Received 10 messages Consumer: Printing messages: Vector(Hello(hello @ 1446277167607), Hello(hello @ 1446277168129), Hello(hello @ 1446277168650), Hello(hello @ 1446277169169), Hello(hello @ 1446277169690), Hello(hello @ 1446277170210), Hello(hello @ 1446277170729), Hello(hello @ 1446277171249), Hello(hello @ 1446277171769), Hello(hello @ 1446277172289))Hello(hello @ 1446277167607)Hello(hello @ 1446277168129)Hello(hello @ 1446277168650)Hello(hello @ 1446277169169)Hello(hello @ 1446277169690)Hello(hello @ 1446277170210)Hello(hello @ 1446277170729)Hello(hello @ 1446277171249)Hello(hello @ 1446277171769)Hello(hello @ 1446277172289) Producer: Adding new 'Hello(hello @ 1446277172808)' to receiver: Actor[akka://Root/user/receiver#1097367365] Producer: Adding new 'Hello(hello @ 1446277173328)' to receiver: Actor[akka://Root/user/receiver#1097367365] Producer: Adding new 'Hello(hello @ 1446277173849)' to receiver: Actor[akka://Root/user/receiver#1097367365] Producer: Adding new 'Hello(hello @ 1446277174369)' to receiver: Actor[akka://Root/user/receiver#1097367365]酷吧! 從消息序列中可以看到,我們的生產者將消息發送到接收者,接收者將它們排隊。 接下來,我們有一個使用者,它請求到目前為止已收到的所有消息并打印出來。
這是關于Akka-Typed的文章的內容,在下一篇文章中,我們將介紹同樣存在于Akka-Typed的接待員模式。
翻譯自: https://www.javacodegeeks.com/2015/11/akka-typed-actors-exploring-the-receiver-pattern.html
總結
以上是生活随笔為你收集整理的Akka型演员:探索接收器模式的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 安卓samba客户端(安卓samba)
- 下一篇: saxparser_使用SaxParse