Scala zio-actors与akka-actor集成
zio-actors與akka-actor集成
zio-actors 與 akka-actor 是兩種不同實現(xiàn),分兩種情況:
- zio actor 發(fā)消息給 akka actor
- akka actor 發(fā)消息給 zio actor
依賴
不包括 akka actor 和 zio-actors 依賴,只是集成所需的
"dev.zio" %% "zio-actors-akka-interop" % <VERSION>"所需的導入如下:
import zio.actors.Actor.Stateful import zio.actors.{ ActorSystem, ActorRef, Context, Supervisor } import zio.actors.akka.{ AkkaTypedActor, AkkaTypedActorRefLocal } import zio.{ IO, Runtime }import akka.actor.typed import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.Scheduler import akka.util.Timeoutimport scala.concurrent.duration._本章例子的樣例類:
sealed trait TypedMessage[+_] case class PingToZio(zioReplyToActor: ActorRef[ZioMessage], msg: String) extends TypedMessage[Unit] case class PingFromZio(zioSenderActor: ActorRef[ZioMessage]) extends TypedMessage[Unit]sealed trait ZioMessage[+_] case class PongFromAkka(msg: String) extends ZioMessage[Unit] case class Ping(akkaActor: AkkaTypedActorRefLocal[TypedMessage]) extends ZioMessage[Unit]基本的 actors 使用需要定義一個Stateful來描述 actor 的行為。然后通過監(jiān)督方式、初始狀態(tài)和提到的Stateful來完成 actor 的創(chuàng)建。
在 zio actor 與 akka actor 通信
zio actor Stateful 實現(xiàn)如下:
val handler = new Stateful[Any, String, ZioMessage] {override def receive[A](state: String, msg: ZioMessage[A], context: Context): IO[Throwable, (String, A)] =msg match { case PongFromAkka(msg) => IO.succeed((msg, ())) // zio actor接收akka actor的消息case Ping(akkaActor) => // akkaActor的類型是AkkaTypedActorRefLocal,而不是 akka actor 的ActorReffor {self <- context.self[ZioMessage]_ <- akkaActor ! PingFromZio(self) // 把self帶上用于收回復} yield (state, ())case _=> IO.fail(new Exception("fail"))} }在 akka actor 中 發(fā)送消息到 zio actor
akka actor,需要一個行為(behavior)來定義要處理的消息,在這種情況下向 zio actor 發(fā)送和接收消息:
object TestBehavior {lazy val zioRuntime = Runtime.defaultdef apply(): Behavior[TypedMessage[_]] =Behaviors.receiveMessage { message =>message match { case PingToZio(zioReplyToActor, msgToZio) => // 在akka 中發(fā)消息,需要unsafeRun執(zhí)行ZIO effectzioRuntime.unsafeRun(zioReplyToActor ! PongFromAkka(msgToZio)) case PingFromZio(zioSenderActor) => zioRuntime.unsafeRun(zioSenderActor ! PongFromAkka("Pong from Akka"))}Behaviors.same}}主程序
我們已經(jīng)準備好開始從 zio 向 akka 發(fā)送消息,或者通過fire-and-forget交互模式反過來,但首先我們需要用創(chuàng)建的 akka ActorRef(或ActorSystem)創(chuàng)建一個 ZIO 值,可以使用AkkaTypedActor.make:
for {akkaSystem <- IO(typed.ActorSystem(TestBehavior(), "akkaSystem")) // akka actor 的 ActorSystemsystem <- ActorSystem("zioSystem") // zio actor 的 ActorSystemakkaActor <- AkkaTypedActor.make(akkaSystem) // 使用interop提供的AkkaTypedActor,對akka actor做一次包裝zioActor <- system.make("zioActor", Supervisor.none, "", handler) // 使用zio的ActorSystem創(chuàng)建zio actor_ <- akkaActor ! PingToZio(zioActor, "Ping from Akka") // 發(fā)消息給akka actor,并帶上zioActor,用于接收回復_ <- zioActor ! Ping(akkaActor) // 發(fā)消息給zio actor,并帶上akkaActor,用于接收回復 } yield ()zim 中應用
zim 不涉及到2種 actor 通信,websocket 使用的是 akka actor,而在定時任務處使用了 zio actor,實現(xiàn)一個基于 zio actor 的定時器如下:
object ScheduleStateful {val stateful: Stateful[Any, Unit, Command] = new Stateful[Any, Unit, Command] {override def receive[A](state: Unit, msg: Command[A], context: Context): UIO[(Unit, A)] = {val taskIO = msg match {case OnlineUserMessage(descr) =>WsService.getConnections.flatMap { i =>LogUtil.debug(s"${descr.getOrElse("receive")} Total online user => $i")}case _ => UIO.unit}// 這里返回的類型按照zio-actors官網(wǎng)的寫法返回(Unit, A) idea會提示語法錯誤,目前還不知道是誰的問題,只能強制轉(zhuǎn)換了taskIO.foldM(e => LogUtil.error(s"ScheduleStateful $e").as(() -> "".asInstanceOf[A]),_ => ZIO.succeed(() -> "".asInstanceOf[A]))}} }根據(jù)Stateful創(chuàng)建 actor
lazy val scheduleActor: ZIO[Any, Throwable, ActorRef[protocol.Command]] =actorSystem.flatMap(_.make(Constants.SCHEDULE_JOB_ACTOR, zio.actors.Supervisor.none, (), ScheduleStateful.stateful)).provideLayer(Clock.live ++ InfrastructureConfiguration.live)啟動 actor,只需要像使用普通方法一樣調(diào)用該方法即可:
def scheduleTask: Task[Unit] = {val task = ZioActorSystemConfiguration.scheduleActor.flatMap(f => f ! OnlineUserMessage(Some("scheduleTask"))) repeat Schedule.secondOfMinute(0)// secondOfMinute類似于Cron的時間表,每分鐘的指定秒數(shù)重復出現(xiàn)。此處為0秒task.foldM(e => LogUtil.error(s"error => $e").unit,_ => UIO.unit).provideLayer(Clock.live)}zim 是一個web端即時通訊系統(tǒng),使用scala2語言,基于zio、tapir、akka,scallikejdbc等庫實現(xiàn)。
總結(jié)
以上是生活随笔為你收集整理的Scala zio-actors与akka-actor集成的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 亚马逊美国站UL2849电动自行车标准测
- 下一篇: 用Egret制作功能简单的打地鼠类游戏《