生活随笔
收集整理的這篇文章主要介紹了
讨喜的隔离可变性(五)同时使用多个角色
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
通過前面的學(xué)習(xí),我們已經(jīng)了解了如何創(chuàng)建角色以及如何給角色發(fā)送消息,下面讓我們來一起學(xué)習(xí)如何讓多個(gè)角色協(xié)同工作。在第2章中,我們創(chuàng)建了一個(gè)統(tǒng)計(jì)給定區(qū)間內(nèi)所有素?cái)?shù)的程序。在該程序中,我們使用了ExecutorService、Callable、Future以及其他差不多超過一頁紙那么多代碼。本節(jié)我們將會(huì)學(xué)習(xí)如何用Akka角色對(duì)該示例進(jìn)行重構(gòu),并且根據(jù)之前的慣例我們的介紹順序還是先Java后Scala。
在Java中同時(shí)使用多個(gè)角色
假定待統(tǒng)計(jì)數(shù)字集合中的數(shù)字是1千萬個(gè),為了統(tǒng)計(jì)其中的素?cái)?shù)數(shù)量,之前我們是將數(shù)字集合劃分為若干個(gè)不相交的子集合,并將這些子集合丟給一些線程去執(zhí)行統(tǒng)計(jì)操作。但這里我們將使用角色來完成同樣的功能,下面就讓我們從角色的onRecevie()函數(shù)開始說起吧:
| 1 | public class Primes extends UntypedActor { |
| 2 | public void onReceive(final Object boundsList) { |
| 3 | final List<Integer> bounds = (List<Integer>) boundsList; |
| 5 | PrimeFinder.countPrimesInRange(bounds.get(0), bounds.get(1)); |
| 6 | getContext().replySafe(count); |
為了統(tǒng)計(jì)給定區(qū)間內(nèi)的素?cái)?shù)數(shù)量,我們需要指定區(qū)間的上下限。在本例中,onReceive()函數(shù)的參數(shù)是一個(gè)List,其中前兩個(gè)元素即為區(qū)間的上下限。在onReceive()函數(shù)內(nèi)部,我們調(diào)用了PrimeFinder類的countPrimesInRage()函數(shù)來統(tǒng)計(jì)區(qū)間內(nèi)的素?cái)?shù)數(shù)量,最后又使用replySafe()函數(shù)將統(tǒng)計(jì)結(jié)果返回給調(diào)用者。
在給定了待統(tǒng)計(jì)的數(shù)字集合之后,我們需要將其劃分成若干個(gè)不相交的子集合并將統(tǒng)計(jì)這些子集合中素?cái)?shù)數(shù)量的任務(wù)委托給各個(gè)不同的角色來執(zhí)行。下面就讓我們?cè)陟o態(tài)方法countPrimes()中實(shí)現(xiàn)這些邏輯:
| 01 | public static int countPrimes( |
| 02 | final int number, final int numberOfParts) { |
| 03 | final int chunksPerPartition = number / numberOfParts; |
| 04 | final List<Future<?>> results = new ArrayList<Future<?>>(); |
| 05 | for(int index = 0; index < numberOfParts; index++) { |
| 06 | final int lower = index * chunksPerPartition + 1; |
| 07 | final int upper = (index == numberOfParts - 1) ? number : |
| 08 | lower + chunksPerPartition - 1; |
| 09 | final List<Integer> bounds = Collections.unmodifiableList( |
| 10 | Arrays.asList(lower, upper)); |
| 11 | final ActorRef primeFinder = Actors.actorOf(Primes.class).start(); |
| 12 | results.add(primeFinder.sendRequestReplyFuture(bounds)); |
| 15 | for(Future<?> result : results) |
| 16 | count += (Integer)(result.await().result().get()); |
| 17 | Actors.registry().shutdownAll(); |
在確定了每個(gè)子集合的范圍之后,我們會(huì)將其包裝在一個(gè)不可變集合里——請(qǐng)記住,所有的消息都必須是不可變的。接下來,我們調(diào)用sendRequestReplyFuture()這個(gè)非阻塞函數(shù)來將統(tǒng)計(jì)請(qǐng)求發(fā)送給各個(gè)角色進(jìn)行處理。在把請(qǐng)求發(fā)送出去之后,我們將sendRequestReplyFuture()返回的Future對(duì)象(注意這里是akka.dispatch.Future而不是JDK中的java.util.concurrent.Future)保存在一個(gè)數(shù)組中以便稍后從其中取回各個(gè)子集合的統(tǒng)計(jì)結(jié)果。在任務(wù)分派完畢之后,我們就可以循環(huán)查詢每個(gè)Future,即先調(diào)用Future的await()函數(shù),待await()函數(shù)返回之后再調(diào)用其返回值的result()函數(shù)來獲取一個(gè)Scala的Option實(shí)例——你可以將其假想為一個(gè)包含統(tǒng)計(jì)結(jié)果的數(shù)據(jù)單元(如果數(shù)據(jù)存在的話)。最后我們可以通過調(diào)用該實(shí)例對(duì)象的get()函數(shù)來得到一個(gè)Integer類型的統(tǒng)計(jì)值。
OK,下面就讓我們寫一個(gè)用來檢驗(yàn)上述代碼的測(cè)試用例,其中的待統(tǒng)計(jì)數(shù)字和子集合劃分?jǐn)?shù)是通過命令行傳給程序的:
| 01 | public static void main(final String[] args) { |
| 03 | System.out.println("Usage: number numberOfParts"); |
| 05 | final long start = System.nanoTime(); |
| 06 | final int count = countPrimes( |
| 07 | Integer.parseInt(args[0]), Integer.parseInt(args[1])); |
| 08 | Working with Multiple Actors ? 179 |
| 09 | final long end = System.nanoTime(); |
| 10 | System.out.println("Number of primes is " + count); |
| 11 | System.out.println("Time taken " + (end - start)/1.0e9); |
main()函數(shù)主要負(fù)責(zé)對(duì)上面的統(tǒng)計(jì)代碼進(jìn)行測(cè)試并記錄執(zhí)行耗時(shí)。最后我們還需要實(shí)現(xiàn)PrimeFinder這個(gè)真正負(fù)責(zé)統(tǒng)計(jì)工作的類:
| 01 | public class PrimeFinder { |
| 02 | public static boolean isPrime(final int number) { |
| 03 | if (number <= 1) return false; |
| 04 | final int limit = (int) Math.sqrt(number); |
| 05 | for(int i = 2; i <= limit; i++) if(number % i == 0) return false; |
| 08 | public static int countPrimesInRange(final int lower, final int upper) { |
| 10 | for(int index = lower; index <= upper; index++) |
| 11 | if(isPrime(index)) count += 1; |
令待統(tǒng)計(jì)區(qū)間為[1, 1000w]、劃分的子區(qū)間為100個(gè),則上述示例程序的輸出結(jié)果如下所示:
| 1 | Number of primes is 664579 |
下面讓我們將本節(jié)的代碼和輸出結(jié)果與第2.4節(jié)的示例代碼和輸出結(jié)果進(jìn)行比較。雖然兩個(gè)版本都將子集合數(shù)設(shè)為100,但Akka版本的示例代碼無需顯式設(shè)定線程池大小。此外,由于這是一個(gè)計(jì)算密集型問題,所以對(duì)于使用ExecutorService的版本而言,其線程池大小的設(shè)定是需要隨機(jī)器CPU核數(shù)計(jì)算而定的,所以兩個(gè)版本的性能都差不多,而Akka版本在代碼的形式上要比使用ExecutorServer的版本簡(jiǎn)潔一些。但正如我們?cè)诒菊潞竺鎸?huì)看到的那樣,當(dāng)我們需要讓多個(gè)線程/角色相互協(xié)作的時(shí)候,這些區(qū)別將會(huì)愈發(fā)明顯。
在Scala中同時(shí)使用多角色
如果用Scala來實(shí)現(xiàn)這個(gè)統(tǒng)計(jì)素?cái)?shù)數(shù)量的程序,那么我們就可以深切體會(huì)到Scala在角色的實(shí)現(xiàn)以及與角色交互方面的簡(jiǎn)潔和優(yōu)雅。下面讓我們來看看Scala版本的Primes類是如何實(shí)現(xiàn)的:
| 01 | class Primes extends Actor { |
| 03 | case (lower : Int, upper : Int) => |
| 04 | val count = PrimeFinder.countPrimesInRange(lower, upper) |
| 05 | self.replySafe(new Integer(count)) |
| 09 | def countPrimes(number : Int, numberOfParts : Int) = { |
| 10 | val chunksPerPartition : Int = number / numberOfParts |
| 11 | val results = new Array[Future[Integer]](numberOfParts) |
| 13 | while(index < numberOfParts) { |
| 14 | val lower = index * chunksPerPartition + 1 |
| 15 | val upper = if (index == numberOfParts - 1) |
| 16 | number else lower + chunksPerPartition - 1 |
| 17 | val bounds = (lower, upper) |
| 18 | val primeFinder = Actor.actorOf[Primes].start() |
| 19 | results(index) = (primeFinder !!! bounds).asInstanceOf[Future[Integer]] |
| 24 | while(index < numberOfParts) { |
| 25 | count += results(index).await.result.get.intValue() |
| 28 | Actors.registry.shutdownAll |
| 31 | def main(args : Array[String]) : Unit = { |
| 33 | println("Usage: number numberOfParts") |
| 35 | val start = System.nanoTime |
| 36 | val count = countPrimes(args(0).toInt, args(1).toInt) |
| 37 | val end = System.nanoTime |
| 38 | println("Number of primes is " + count) |
| 39 | println("Time taken " + (end - start)/1.0e9) |
Scala版本的代碼與Java版本有幾點(diǎn)不同。首先,Scala版本所使用的消息格式是簡(jiǎn)單的元組而不是一個(gè)不可變列表。其次,receive()函數(shù)中的case語句與應(yīng)用場(chǎng)景十分契合。第三,Java版本中countPrimes()函數(shù)里的for循環(huán)在這里變成了一個(gè)while循環(huán)。其原因是,雖然Scala的for循環(huán)表達(dá)式十分優(yōu)雅,但會(huì)增加Object到基本類型之間的轉(zhuǎn)換開銷。為了能夠得到比較真實(shí)的性能對(duì)比,我在這里放棄了優(yōu)雅。
類似地,在PrimeFinder中,我們也用while循環(huán)代替了for循環(huán)。
| 02 | def isPrime(number : Int) : Boolean = { |
| 03 | if (number <= 1) return false |
| 04 | var limit = scala.math.sqrt(number).toInt |
| 07 | if(number % i == 0) return false |
| 12 | def countPrimesInRange(lower : Int, upper : Int) : Int = { |
| 15 | while(index <= upper) { |
| 16 | if(isPrime(index)) count += 1 |
令待統(tǒng)計(jì)區(qū)間為[1,1000w]、劃分的子區(qū)間為100個(gè),則Scala版示例程序的性能如下所示:
| 1 | Number of primes is 664579 |
總結(jié)
以上是生活随笔為你收集整理的讨喜的隔离可变性(五)同时使用多个角色的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。