項目場景:
使用sparkStream接收kafka的數(shù)據(jù)進行計算,并且打包上傳到linux進行spark任務(wù)的submit
錯誤集合:
1.錯誤1:
Failed to
add file
: / usr
/ local
/ spark
- yarn
/ . /myapp
/ sparkDemo04
. jar to Spark environment
java
. io
. FileNotFoundException
: Jar D
: \usr\local\spark
- yarn\myapp\sparkDemo04
. jar not found
WARN ProcfsMetricsGetter
: Exception when trying to
compute pagesize
, as a result reporting of ProcessTree metrics is stopped
2.windows下ideal中在yarn模式下運行代碼出錯,顯示如下報錯
WARN CheckpointReader
: Error reading checkpoint from file hdfs
: / / hadoop102
: 9000 / checkpoint6
/ checkpoint
- 1637834226000
java
. io
. IOException
: java
. lang
. ClassCastException
: cannot assign instance of java
. lang
. invoke
. SerializedLambda to
field org
. apache
. spark
. streaming
. dstream
. MappedDStream
. mapFunc of
type scala
. Function1 in instance of org
. apache
. spark
. streaming
. dstream
. MappedDStream
3.報的一些kafka包notfound的問題,這個下面就不討論了,只需要把對應(yīng)的包下載后放到spark目錄下的jars文件中即可,比如常見的
java
. lang
. NoClassDefFoundError
: org
/ apache
/ spark
/ kafka010
/ KafkaConfigUpdater
都可以通過添加包的方式解決,如果是spark shell里面出現(xiàn)這種錯誤,則需要在輸入spark-shell命令時,在后面添加 --jars 包路徑 最初的代碼:
import com. study. stream05_kafka. SparkKafka
. createSSC
import org. apache. kafka. clients. consumer. { ConsumerConfig
, ConsumerRecord
}
import org. apache. log4j. { Level
, Logger
}
import org. apache. spark. SparkConf
import org. apache. spark. rdd. RDD
import org. apache. spark. streaming. dstream. { DStream
, InputDStream
}
import org. apache. spark. streaming. kafka010. { ConsumerStrategies
, KafkaUtils
, LocationStrategies
}
import org. apache. spark. streaming. { Seconds
, StreamingContext
} import java. lang. System
. getProperty
import scala. collection. mutable. ListBuffer
object stream05_kafka
{ object SparkKafka
{ def createSSC
( ) : _root_
. org
. apache
. spark
. streaming
. StreamingContext
= { val sparkConf
= new SparkConf
( ) . setMaster
( "local[*]" ) . setAppName
( "kafka2" ) sparkConf
. set
( "spark.streaming.stopGracefullyOnShutdown" , "true" ) sparkConf
. set
( "spark.hadoop.fs.defaultFS" , "hdfs://hadoop102:9000" ) sparkConf
. set
( "spark.hadoop.yarn.resoursemanager.address" , "hadoop103:8088" ) val streamingContext
: StreamingContext
= new StreamingContext
( sparkConf
, Seconds
( 3 ) ) streamingContext
. checkpoint
( "hdfs://hadoop102:9000/checkpoint6" ) val kafkaPara
: Map
[ String , Object
] = Map
[ String , Object
] ( ConsumerConfig
. BOOTSTRAP_SERVERS_CONFIG
-> "hadoop102:9092,hadoop103:9092,hadoop104:9092" , ConsumerConfig
. GROUP_ID_CONFIG
-> "second" , "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer" , "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer" ) val kafkaDS
: InputDStream
[ ConsumerRecord
[ String , String ] ] = KafkaUtils
. createDirectStream
[ String , String ] ( streamingContext
, LocationStrategies
. PreferConsistent
, ConsumerStrategies
. Subscribe
[ String , String ] ( Set
( "sparkOnKafka" ) , kafkaPara
) ) val num
: DStream
[ String ] = kafkaDS
. map
( _
. value
( ) ) val result
= num
. map
( line
=> { val flows
= line
. split
( "," ) val up
= flows
( 1 ) . toInt
val down
= flows
( 2 ) . toInt
( flows
( 0 ) , ( up
, down
, up
+ down
) ) } ) . updateStateByKey
( ( queueValue
, buffValue
: Option
[ ( Int , Int , Int ) ] ) => { val cur
= buffValue
. getOrElse
( ( 0 , 0 , 0 ) ) var curUp
= cur
. _1
var curDown
= cur
. _2
for ( elem
<- queueValue
) { curUp
+= elem
. _1curDown
+= elem
. _2
} Option
( ( curUp
, curDown
, curUp
+ curDown
) ) } ) result
. print
( ) streamingContext
} } def main
( args
: Array
[ String ] ) : Unit = { println
( "**************" ) Logger
. getLogger
( "org.apache.spark" ) . setLevel
( Level
. WARN
) System
. getProperties
. setProperty
( "HADOOP_USER_NAME" , "hadoop" ) val streamingContext
= StreamingContext
. getActiveOrCreate
( "hdfs://hadoop102:9000/checkpoint6" , ( ) => createSSC
( ) ) streamingContext
. start
( ) streamingContext
. awaitTermination
( ) } }
原因分析:
首先,這里指出如果要打包到linux 下在yarn模式下進行spark的submit,需要設(shè)置master為yarn,至于是yarn-client還是yarn-cluster需要提交任務(wù)時指定,默認是client。我這里寫成local,所以一開始都是windows下可以正常連接kafka拿到數(shù)據(jù)進行計算,但是linux下就不行了。歸根結(jié)底沒有連接yarn。 1.錯誤1是因為windows下spark任務(wù)提交的時候,找不到你的jar包,試想一下spark的spark-submit命令,需要指定jar包以及class 2.這個是序列化問題還是廣播變量不適合于檢查點的問題,查資料發(fā)現(xiàn)廣播變量的內(nèi)容寫入hdfs后就難以恢復(fù)了,這里可以把錯誤定位到StreamingContext.getActiveOrCreate里面,這里有時候可以正常進行數(shù)據(jù)恢復(fù),但是有時候就會報錯。解決方法還沒找到,我就直接換檢查點路徑了,一般生產(chǎn)環(huán)境下也只有代碼升級的情況下會關(guān)閉流計算,這里就沒有深究,希望大神可以解答一下。猜測是讀取檢查點數(shù)據(jù)的時候序列化出了問題
解決方案:
錯誤1的解決:所以如果要在windows下運行,需要先使用mvn package或者build artifacts對程序進行打包,然后對sparkConf.setJars指定包的路徑,這樣在windows下就可以正常運行了 錯誤2的解決:這里我就換檢查點了 最后貼一下我最終成功運行的代碼
import com. study. stream05_kafka. SparkKafka
. createSSC
import org. apache. kafka. clients. consumer. { ConsumerConfig
, ConsumerRecord
}
import org. apache. log4j. { Level
, Logger
}
import org. apache. spark. SparkConf
import org. apache. spark. rdd. RDD
import org. apache. spark. streaming. dstream. { DStream
, InputDStream
}
import org. apache. spark. streaming. kafka010. { ConsumerStrategies
, KafkaUtils
, LocationStrategies
}
import org. apache. spark. streaming. { Seconds
, StreamingContext
} import java. lang. System
. getProperty
import scala. collection. mutable. ListBuffer
object stream05_kafka
{ object SparkKafka
{ def createSSC
( ) : _root_
. org
. apache
. spark
. streaming
. StreamingContext
= { val sparkConf
= new SparkConf
( ) . setMaster
( "yarn" ) . setAppName
( "kafka2" ) . set
( "spark.serializer" , "org.apache.spark.serializer.KryoSerializer" ) sparkConf
. set
( "spark.streaming.stopGracefullyOnShutdown" , "true" ) sparkConf
. set
( "spark.hadoop.fs.defaultFS" , "hdfs://hadoop102:9000" ) sparkConf
. set
( "spark.hadoop.yarn.resoursemanager.address" , "hadoop103:8088" ) val streamingContext
: StreamingContext
= new StreamingContext
( sparkConf
, Seconds
( 3 ) ) streamingContext
. checkpoint
( "hdfs://hadoop102:9000/checkpoint7" ) val kafkaPara
: Map
[ String , Object
] = Map
[ String , Object
] ( ConsumerConfig
. BOOTSTRAP_SERVERS_CONFIG
-> "hadoop102:9092,hadoop103:9092,hadoop104:9092" , ConsumerConfig
. GROUP_ID_CONFIG
-> "second" , "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer" , "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer" ) val kafkaDS
: InputDStream
[ ConsumerRecord
[ String , String ] ] = KafkaUtils
. createDirectStream
[ String , String ] ( streamingContext
, LocationStrategies
. PreferConsistent
, ConsumerStrategies
. Subscribe
[ String , String ] ( Set
( "sparkOnKafka" ) , kafkaPara
) ) val num
: DStream
[ String ] = kafkaDS
. map
( _
. value
( ) ) val result
= num
. map
( line
=> { val flows
= line
. split
( "," ) val up
= flows
( 1 ) . toInt
val down
= flows
( 2 ) . toInt
( flows
( 0 ) , ( up
, down
, up
+ down
) ) } ) . updateStateByKey
( ( queueValue
, buffValue
: Option
[ ( Int , Int , Int ) ] ) => { val cur
= buffValue
. getOrElse
( ( 0 , 0 , 0 ) ) var curUp
= cur
. _1
var curDown
= cur
. _2
for ( elem
<- queueValue
) { curUp
+= elem
. _1curDown
+= elem
. _2
} Option
( ( curUp
, curDown
, curUp
+ curDown
) ) } ) result
. print
( ) streamingContext
} } def main
( args
: Array
[ String ] ) : Unit = { println
( "**************" ) Logger
. getLogger
( "org.apache.spark" ) . setLevel
( Level
. WARN
) System
. getProperties
. setProperty
( "HADOOP_USER_NAME" , "hadoop" ) val streamingContext
= StreamingContext
. getActiveOrCreate
( "hdfs://hadoop102:9000/checkpoint7" , ( ) => createSSC
( ) )
streamingContext
. start
( ) streamingContext
. awaitTermination
( ) } }
另外,打包的時候不要添加setJars,否則還是會報錯,報的是什么已經(jīng)忘了,這篇博客也是在我解決問題之后寫的,沒有記錄太多報錯,如果我沒記錯的話可能會報這種錯誤
cannot assign instance of java
. lang
. invoke
. SerializedLambda to
field org
. apache
. spark
. rdd
. MapPartitionsRDD
. f of
type scala
. Function3 in instance of org
. apache
. spark
. rdd
. MapPartitionsRDD
困惑:
為了解決這個bug,也是在yarn日志和spark日志來回看,看了一天,最讓我頭疼的就是spark-submit使用control+z退出后,spark-submit進行還會在后臺運行,我都懷疑是不是我的kill -9 操作使檢查點損壞導(dǎo)致數(shù)據(jù)恢復(fù)失敗的,請問各路大神怎么才能結(jié)束sparkSubmit進程?
總結(jié)
以上是生活随笔 為你收集整理的spark on yarn模式下SparkStream整合kafka踩的各种坑(已解决)_fqzzzzz的博客 的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔 推薦給好友。