Scalaz(44)- concurrency :scalaz Future,尚不完整的多线程类型
scala已經配備了自身的Future類。我們先舉個例子來了解scala Future的具體操作:?
1 import scala.concurrent._ 2 import ExecutionContext.Implicits.global 3 object scalafuture { 4 def dbl(i: Int): Future[Int] = Future { Thread.sleep(1000) ; i + i } 5 //> dbl: (i: Int)scala.concurrent.Future[Int] 6 val fdbl = dbl(3) //> fdbl : scala.concurrent.Future[Int] = List() 7 fdbl.onSuccess { 8 case a => println(s"${a/2} + ${a/2} = $a") 9 } 10 println("calculating ...") //> calculating ... 11 Thread.sleep(2000) //> 3 + 3 = 6 12 }?這是一個標準的異步運算;在成功完成運算事件上綁定callback來獲取在其它線程中的運算結果。我們也可以進行異常處理:
1 val fdz = Future { 3 / 0 } //> fdz : scala.concurrent.Future[Int] = List() 2 fdz.onFailure { 3 case e => println(s"error message {${e.getMessage}}") 4 } 5 Thread.sleep(100) //> error message {/ by zero}又或者同時綁定運算成功和失敗事件的callback函數:
1 import scala.util.{Success, Failure} 2 fdz onComplete { 3 case Success(a) => println(s"${a/2} + ${a/2} = $a") 4 case Failure(e) => println(s"error message {${e.getMessage}}") 5 } 6 Thread.sleep(100) //> error message {/ by zero}?scala Future 實現了flatMap,我們可以把幾個Future組合起來用:
1 def dbl(i: Int): Future[Int] = Future { Thread.sleep(1000); i + i } 2 //> dbl: (i: Int)scala.concurrent.Future[Int] 3 def sqr(i: Int): Future[Int] = Future { i * i } //> sqr: (i: Int)scala.concurrent.Future[Int] 4 def sum(a: Int, b: Int): Future[Int] = Future { a + b } 5 //> sum: (a: Int, b: Int)scala.concurrent.Future[Int] 6 val fsum = for { 7 a <- dbl(3) 8 b <- sqr(a) 9 c <- sum(a,b) 10 } yield c //> fsum : scala.concurrent.Future[Int] = List() 11 12 fsum onSuccess { case c => println(s"the combined result is: $c") } 13 Thread.sleep(2000) //> the combined result is: 42scala Future利用flatMap實現了流程運算:先運算dbl再sqr再sum,這個順序是固定的即使它們可能在不同的線程里運算,因為sqr依賴dbl的結果,而sum又依賴dbl和sqr的結果。
好了,既然scala Future的功能已經比較完善了,那么scalaz的Future又有什么不同的特點呢?首先,細心一點可以發現scala Future是即時運算的,從下面的例子里可以看出:
1 import scala.concurrent.duration._ 2 val fs = Future {println("run now..."); System.currentTimeMillis() } 3 //> run now... 4 //| fs : scala.concurrent.Future[Long] = List() 5 Await.result(fs, 1.second) //> res0: Long = 1465907784714 6 Thread.sleep(1000) 7 Await.result(fs, 1.second) //> res1: Long = 1465907784714可以看到fs是在Future構建時即時運算的,而且只會運算一次。如果scala Future中包括了能產生副作用的代碼,在構建時就會立即產生副作用。所以我們是無法使用scala Future來編寫純函數的,那么在scalaz里就必須為并發編程提供一個與scala Future具同等功能但又不會立即產生副作用的類型了,這就是scalaz版本的Future。我們看看scalaz是如何定義Future的:scalaz.concurrent/Future.scala
sealed abstract class Future[+A] { ... object Future {case class Now[+A](a: A) extends Future[A]case class Async[+A](onFinish: (A => Trampoline[Unit]) => Unit) extends Future[A]case class Suspend[+A](thunk: () => Future[A]) extends Future[A]case class BindSuspend[A,B](thunk: () => Future[A], f: A => Future[B]) extends Future[B]case class BindAsync[A,B](onFinish: (A => Trampoline[Unit]) => Unit,f: A => Future[B]) extends Future[B] ...Future[A]就是個Free Monad。它的結構化表達方式分別有Now,Async,Suspend,BindSuspend,BindAsync。我們可以用這些結構實現flatMap函數,所以Future就是Free Monad:
?
def flatMap[B](f: A => Future[B]): Future[B] = this match {case Now(a) => Suspend(() => f(a))case Suspend(thunk) => BindSuspend(thunk, f)case Async(listen) => BindAsync(listen, f)case BindSuspend(thunk, g) =>Suspend(() => BindSuspend(thunk, g andThen (_ flatMap f)))case BindAsync(listen, g) =>Suspend(() => BindAsync(listen, g andThen (_ flatMap f)))}?free structure類型可以支持算式/算法關注分離,也就是說我們可以用scalaz Future來描述程序功能而不涉及正真運算。scalaz?Future的構建方式如下:
1 import scalaz._ 2 import Scalaz._ 3 import scalaz.concurrent._ 4 import scala.concurrent.duration._ 5 object scalazFuture { 6 val fnow = Future.now {println("run..."); System.currentTimeMillis()} 7 //> run... 8 //| fnow : scalaz.concurrent.Future[Long] = Now(1465909860301) 9 val fdelay = Future.delay {println("run..."); System.currentTimeMillis()} 10 //> fdelay : scalaz.concurrent.Future[Long] = Suspend(<function0>) 11 val fapply = Future {println("run..."); System.currentTimeMillis()} 12 //> fapply : scalaz.concurrent.Future[Long] = Async(<function1>)可以看到fnow是個即時運算的構建器,而這個now就是一個lift函數, 它負責把一個普通無副作用運算升格成Future。fdelay,fapply分別把運算存入trampoline進行結構化了。我們必須另外運算trampoline來運行結構內的運算:
1 fdelay.run //> run... 2 //| res0: Long = 1465910524847 3 Thread.sleep(1000) 4 fdelay.run //> run... 5 //| res1: Long = 1465910525881 6 fapply.run //> run... 7 //| res2: Long = 1465910525883 8 Thread.sleep(1000) 9 fapply.run //> run... 10 //| res3: Long = 1465910526884scalaz Future只有在運算時才會產生副作用,而且可以多次運算。
我們可以用即時(blocking)、異步、定時方式來運算Future:
1 fapply.unsafePerformSync //> run... 2 //| res4: Long = 1465958049118 3 fapply.unsafePerformAsync { 4 case a => println(a) 5 } 6 Thread.sleep(1000) 7 fapply.unsafePerformSyncFor(1 second) //> run... 8 //| 1465958051126 9 //| run... 10 //| res5: Long = 1465958052172結構化狀態Async代表了scalaz Future的多線程處理特性:
/*** Create a `Future` from an asynchronous computation, which takes the form* of a function with which we can register a callback. This can be used* to translate from a callback-based API to a straightforward monadic* version. See `Task.async` for a version that allows for asynchronous* exceptions.*/def async[A](listen: (A => Unit) => Unit): Future[A] =Async((cb: A => Trampoline[Unit]) => listen { a => cb(a).run })/** Create a `Future` that will evaluate `a` using the given `ExecutorService`. */def apply[A](a: => A)(implicit pool: ExecutorService = Strategy.DefaultExecutorService): Future[A] = Async { cb =>pool.submit { new Callable[Unit] { def call = cb(a).run }}}/** Create a `Future` that will evaluate `a` after at least the given delay. */def schedule[A](a: => A, delay: Duration)(implicit pool: ScheduledExecutorService =Strategy.DefaultTimeoutScheduler): Future[A] =Async { cb =>pool.schedule(new Callable[Unit] {def call = cb(a).run}, delay.toMillis, TimeUnit.MILLISECONDS)}我們看到apply和schedule在構建Future時對運算線程進行了配置。
如果我們需要模仿scala Future的功效可以用unsafeStart:
1 val fs = fapply.unsafeStart //> run... 2 //| fs : scalaz.concurrent.Future[Long] = Suspend(<function0>) 3 fs.run //> res6: Long = 1465958922401 4 Thread.sleep(1000) 5 fs.run //> res7: Long = 1465958922401我們也可以用scala Future的callback方式用async函數把自定義的callback掛在構建的Future上:
1 def fu(t: Long): Future[String] = 2 Future.async[String]{k => k(s"the curreent time is: ${t.toString}!!!")} 3 //> fu: (t: Long)scalaz.concurrent.Future[String] 4 fu(System.currentTimeMillis()).run //> res8: String = the curreent time is: 1465958923415!!!scala Future和scalaz Future之間可以相互轉換:
1 import scala.concurrent.{Future => sFuture} 2 import scala.concurrent.ExecutionContext 3 import scala.util.{Success,Failure} 4 def futureTozFuture[A](sf: sFuture[A])(implicit ec: ExecutionContext): Future[A] = 5 Future.async {cb => sf.onComplete { 6 case Success(a) => cb(a) 7 // case Failure(e) => cb(e) 8 }} //> futureTozFuture: [A](sf: scala.concurrent.Future[A])(implicit ec: scala.con 9 //| current.ExecutionContext)scalaz.concurrent.Future[A] 10 def zFutureTosFuture[A](zf: Future[A]): sFuture[A] = { 11 val prom = scala.concurrent.Promise[A] 12 zf.unsafePerformAsync { 13 case a => prom.success(a)是 14 } 15 prom.future 16 }突然發現scalaz Future是沒有異常處理(exception)功能的。scalaz提供了concurrent.Task類型填補了Future的這部分缺陷。我們會在下篇討論Task。
我們用上面scala Future的例子來示范scalaz Future的函數組合能力:
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
轉載于:https://www.cnblogs.com/tiger-xc/p/5586834.html
總結
以上是生活随笔為你收集整理的Scalaz(44)- concurrency :scalaz Future,尚不完整的多线程类型的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: u盘怎么没办法转换ntfs U盘无法格式
- 下一篇: XP深度u盘启动怎么装系统 XP深度U盘