cache方法
他可以將數據標記為cache,在觸發action的時候,會將數據緩存進內存當中,并進行計算。被標記為cache的RDD第一次觸發action的時候,因為需要將數據緩存入內存當中,時間會比平時慢。但是在后續需要運用該被標記RDD進行計算的時候,計算會快特別多,十分快。所以需要多次重復運用的數據的時候可以將其cache,極大提高效率。比如機器學算法的多次迭代什么的
cache的前提
val cache
= RDD
. cache
將RDD釋放內存
cache
. uppersist ( true )
cache 底層調的方法persist()
底層調用的是persist(),這個方法很靈活,里面可以傳參數
可以通過該StorageLevel.MEMORY_ONLY ,進行緩存磁盤和內存,還可以組合緩存,甚至對數據序列化。序列化可以將數據壓縮,節省空間,但是會多花一點時間。
參數含義: 第一個參數,放到磁盤 第二個參數,放到內存 第三個參數,磁盤中的數據,不是以java對象的方式保存 第四個參數,內存中的數據,以java對象的方式保存
val MEMORY_AND_DISK
= new StorageLevel ( true , true , false , true )
帶2是可以存副本,防止丟失
Checkpoint
在做了復雜的計算后,可以將數據Checkphoint,然后存入HDFS當中,保證數據安全。
1.迭代計算,保證數據安全 2.對速度的要求不高(相對于cache至內存當中) 3.中間結果存入HDFS
步驟
1.設置checkpoint的目錄,通常是HDFS文件系統的目錄 2.經過復雜計算得到中間結果 3.將中間結果checkpoint緩存到HDFS中 4后續的計算可以使用之前checkpoint的數據
val sc
= SparkContext ( conf
)
sc
. setCheckpointDir ( "hdfs://node-4:" )
RDD
. checkpoint ( )
歸屬地計算案例
首先將URL中的ip地址給提取出來,然后將IP地址根據ip規則轉化成歸屬地,然后計算每個城市的用戶數量。
URL
每條URL當中包含了一名顧客的許多信息,比如ip地址,操作系統等等
20090121000133331104000|123.197.66.93|www.pkwutai.cn|/down/downLoad-id-45383.html|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; QQDownload 1.7)|http://www.baidu.com/s?tn=b1ank_pg&ie=gb2312&bs=%C3%C0%C6%BC%B7%FE%D7%B0%B9%DC%C0%ED%C8%ED%BC%FE&sr=&z=&cl=3&f=8&wd=%C6%C6%BD%E2%C3%C0%C6%BC%B7%FE%D7%B0%B9%DC%C0%ED%C8%ED%BC%FE&ct=0|
ip規則:
1.0.1.0|1.0.3.255|16777472|16778239|亞洲|中國|福建|福州||電信|350100|China|CN|119.306239|26.075302
規則解釋:第一第二個字段是ip的起始和結束,然后第三第四就是ip起始和結束轉化為十進制的形式,后面就是對應的地址。
將URL中提取ip,將ip轉化為十進制,根據規則進行匹配,在該城市的ip區間,則將其城市輸出。
二分法查找
二分法查找針對的是一個有序的數據集合,每次通過與區間的中間元素對比,將待查找的區間縮小為之前的一半,直到找到要查找的元素,或者區間被縮小為0
二分查找非常高效,假設數據大小是n,每次查找后數據都會縮小為原來的一半,也就是會除以2,最壞情況下,直到查找區間被縮小為空,才停止
單機計算ip地址
要求
需求:根據訪問日志的ip地址計算出訪問者的歸屬地,并且按照省份,計算出訪問次數,然后將計算好的結果寫入到SQL。 步驟:
1,整理數據,切分出ip字段,然后將ip地址轉換成十進制 2然后將數據緩存到內存中( Executors中的內存中) 3,將訪問log和ip規則進行匹配(二分法查找) 4.取出對份名稱,然后將其和一組合在一起 5.按省份名進行累合 6.將累合后的數據寫入到MySQL中
package cn
. edu360
. day4
import java
. sql
. { Connection
, DriverManager
, PreparedStatement
} import scala
. io
. { BufferedSource
, Source
} object MyUtils
{ def
ip2Long ( ip
: String
) : Long
= { val fragments
= ip
. split ( "[.]" ) var ipNum
= 0 L
for ( i
< - 0 until fragments
. length
) { ipNum
= fragments ( i
) . toLong
| ipNum
<< 8 L
} ipNum
} def
readRules ( path
: String
) : Array
[ ( Long
, Long
, String
) ] = { val bf
: BufferedSource
= Source
. fromFile ( path
)
val lines
: Iterator
[ String
] = bf
. getLines ( ) val rules
: Array
[ ( Long
, Long
, String
) ] = lines
. map ( line
= > { val fileds
= line
. split ( "[|]" ) val startNum
= fileds ( 2 ) . toLongval endNum
= fileds ( 3 ) . toLongval province
= fileds ( 6 ) ( startNum
, endNum
, province
) } ) . toArray rules
} def
binarySearch ( lines
: Array
[ ( Long
, Long
, String
) ] , ip
: Long
) : Int
= { var low
= 0 var high
= lines
. length
- 1 while ( low
<= high
) { val middle
= ( low
+ high
) / 2 if ( ( ip
>= lines ( middle
) . _1
) && ( ip
<= lines ( middle
) . _2
) ) return middle
if ( ip
< lines ( middle
) . _1
) high
= middle
- 1 else { low
= middle
+ 1 } } - 1 } def
data2MySQL ( it
: Iterator
[ ( String
, Int
) ] ) : Unit
= { val conn
: Connection
= DriverManager
. getConnection ( "jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8" , "root" , "123568" ) val pstm
: PreparedStatement
= conn
. prepareStatement ( "INSERT INTO access_log VALUES (?, ?)" ) it
. foreach ( tp
= > { pstm
. setString ( 1 , tp
. _1
) pstm
. setInt ( 2 , tp
. _2
) pstm
. executeUpdate ( ) } ) if ( pstm
!= null
) { pstm
. close ( ) } if ( conn
!= null
) { conn
. close ( ) } } def
main ( args
: Array
[ String
] ) : Unit
= { val rules
: Array
[ ( Long
, Long
, String
) ] = readRules ( "/Users/zx/Desktop/ip/ip.txt" ) val ipNum
= ip2Long ( "114.215.43.42" ) val index
= binarySearch ( rules
, ipNum
) val tp
= rules ( index
) val province
= tp
. _3
println ( province
) }
}
分布式處理計算ip地址1
步驟:
整理數據,切分出ip字段,然后將ip地址轉換成十進制
然后將數據緩存到內存中( Executors中的內存中)
然后使用廣播變量,將Drive端的數據廣播到Executor中
將訪問log和ip規則進行匹配(二分法查找),得出對應的城市
取出對份名稱,然后將其和一組合在一起
按省份名進行累合
將累合后的數據寫入到MySQL中
package cn
. edu360
. day4
import org
. apache
. spark
. broadcast
. Broadcast
import org
. apache
. spark
. rdd
. RDD
import org
. apache
. spark
. { SparkConf
, SparkContext
} object IpLoaction1
{ def
main ( args
: Array
[ String
] ) : Unit
= { val conf
= new SparkConf ( ) . setAppName ( "IpLoaction1" ) . setMaster ( "local[4]" ) val sc
= new SparkContext ( conf
) val rules
: Array
[ ( Long
, Long
, String
) ] = MyUtils
. readRules ( args ( 0 ) ) val broadcastRef
: Broadcast
[ Array
[ ( Long
, Long
, String
) ] ] = sc
. broadcast ( rules
) val accessLines
: RDD
[ String
] = sc
. textFile ( args ( 1 ) ) val func
= ( line
: String
) = > { val fields
= line
. split ( "[|]" ) val ip
= fields ( 1 ) val ipNum
= MyUtils
. ip2Long ( ip
) val rulesInExecutor
: Array
[ ( Long
, Long
, String
) ] = broadcastRef
. valuevar province
= "未知" val index
= MyUtils
. binarySearch ( rulesInExecutor
, ipNum
) if ( index
!= - 1 ) { province
= rulesInExecutor ( index
) . _3
} ( province
, 1 ) } val proviceAndOne
: RDD
[ ( String
, Int
) ] = accessLines
. map ( func
) val reduced
: RDD
[ ( String
, Int
) ] = proviceAndOne
. reduceByKey ( _
+ _
) val r
= reduced
. collect ( ) println ( r
. toBuffer
) sc
. stop ( ) }
}
分布式處理計算ip地址2
與上面的區別是,當ip規則過大的時候可以考慮將數據存儲至HDFS中,用spark將數據讀取至excutor當中,收集到driver,再次廣播至各個excutor。
讀取存儲在HDFS中的IP規則
然后對ip規則進行處理
將分散在多個Executor中的部分IP規則收集到Driver端(collect)
將Driver端的數據廣播到Executo
整理數據,切分出ip字段,然后將ip地址轉換成十進制
將訪問log和ip規則進行匹配(二分法查找),得出對應的城市
取出對份名稱,然后將其和一組合在一起
按省份名進行累合
將累合后的數據寫入到MySQL中
package cn
. edu360
. day4
import java
. sql
. { Connection
, DriverManager
, PreparedStatement
} import org
. apache
. spark
. broadcast
. Broadcast
import org
. apache
. spark
. rdd
. RDD
import org
. apache
. spark
. { SparkConf
, SparkContext
} object IpLoaction2
{ def
main ( args
: Array
[ String
] ) : Unit
= { val conf
= new SparkConf ( ) . setAppName ( "IpLoaction1" ) . setMaster ( "local[4]" ) val sc
= new SparkContext ( conf
) val rulesLines
: RDD
[ String
] = sc
. textFile ( args ( 0 ) ) val ipRulesRDD
: RDD
[ ( Long
, Long
, String
) ] = rulesLines
. map ( line
= > { val fields
= line
. split ( "[|]" ) val startNum
= fields ( 2 ) . toLongval endNum
= fields ( 3 ) . toLongval province
= fields ( 6 ) ( startNum
, endNum
, province
) } ) val rulesInDriver
: Array
[ ( Long
, Long
, String
) ] = ipRulesRDD
. collect ( ) val broadcastRef
: Broadcast
[ Array
[ ( Long
, Long
, String
) ] ] = sc
. broadcast ( rulesInDriver
) val accessLines
: RDD
[ String
] = sc
. textFile ( args ( 1 ) ) val proviceAndOne
: RDD
[ ( String
, Int
) ] = accessLines
. map ( log
= > { val fields
= log
. split ( "[|]" ) val ip
= fields ( 1 ) val ipNum
= MyUtils
. ip2Long ( ip
) val rulesInExecutor
: Array
[ ( Long
, Long
, String
) ] = broadcastRef
. valuevar province
= "未知" val index
= MyUtils
. binarySearch ( rulesInExecutor
, ipNum
) if ( index
!= - 1 ) { province
= rulesInExecutor ( index
) . _3
} ( province
, 1 ) } ) val reduced
: RDD
[ ( String
, Int
) ] = proviceAndOne
. reduceByKey ( _
+ _
)
reduced
. foreachPartition ( it
= > MyUtils
. data2MySQL ( it
) ) sc
. stop ( ) }
}
總結
以上是生活随笔 為你收集整理的spark学习五——归属地计算案例 的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網站內容還不錯,歡迎將生活随笔 推薦給好友。