SparkStreaming “Could not read data from write ahead log record” 报错分析解决
?
SparkStreaming開啟了checkpoint wal后有時會出現(xiàn)如上報錯,但不會影響整體程序,只會丟失報錯的那個job的數(shù)據(jù)。其根本原因是wal文件被刪了,被sparkstreaming自己的清除機(jī)制刪掉了。通常意味著一定程度流式程序上存在速率不匹配或堆積問題。
查看driver日志可發(fā)現(xiàn)類似如下的日志:
2017-03-23 13:55:00 INFO [Logging.scala:58] Attempting to clear 0 old log files in hdfs://alps-cluster/tmp/banyan/checkpoint/RhinoWechatConsumer/receivedBlockMetadata older than 1490248380000: 2017-03-23 13:55:05 INFO [Logging.scala:58] Attempting to clear 1 old log files in hdfs://alps-cluster/tmp/banyan/checkpoint/RhinoWechatConsumer/receivedBlockMetadata older than 1490248470000: hdfs://alps-cluster/tmp/banyan/checkpoint/RhinoWechatConsumer/receivedBlockMetadata/log-1490248404471-1490248464471 2017-03-23 13:55:05 INFO [Logging.scala:58] Cleared log files in hdfs://alps-cluster/tmp/banyan/checkpoint/RhinoWechatConsumer/receivedBlockMetadata older than 1490248470000 2017-03-23 13:55:05 ERROR [Logging.scala:74] Task 41 in stage 35.0 failed 4 times; aborting job 2017-03-23 13:55:05 ERROR [Logging.scala:95] Error running job streaming job 1490248470000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task 41 in stage 35.0 failed 4 times, most recent failure: Lost task 41.3 in stage 35.0 (TID 4273, alps60): org.apache.spark.SparkException: Could not read data from write ahead log record FileBasedWriteAheadLogSegment(hdfs://alps-cluster/tmp/banyan/checkpoint/RhinoWechatConsumer/receivedData/0/log-1490248403649-1490248463649,44333482,118014)at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:143)可以發(fā)現(xiàn)?1490248403649 的日志被刪除程序刪除了(cleared log older than 1490248470000),然后這個wal就報錯了。
Spark官方文檔沒有任何關(guān)于這個的配置,因此直接看源碼。(spark很多這樣的坑,得看源碼才知道如何hack或有些隱藏配置)。
?
1.FileBasedWriteAheadLogSegment 類中根據(jù)日志搜索發(fā)現(xiàn)了clean方法(后面的邏輯就是具體刪除邏輯,暫不關(guān)心),核心就是如何調(diào)整這個threshTime了。
/*** Delete the log files that are older than the threshold time.** Its important to note that the threshold time is based on the time stamps used in the log* files, which is usually based on the local system time. So if there is coordination necessary* between the node calculating the threshTime (say, driver node), and the local system time* (say, worker node), the caller has to take account of possible time skew.** If waitForCompletion is set to true, this method will return only after old logs have been* deleted. This should be set to true only for testing. Else the files will be deleted* asynchronously.*/def clean(threshTime: Long, waitForCompletion: Boolean): Unit = {val oldLogFiles = synchronized {val expiredLogs = pastLogs.filter { _.endTime < threshTime }pastLogs --= expiredLogsexpiredLogs}logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")?
2.一步步看調(diào)用追蹤出去,ReceivedBlockHandler ->?ReceiverSupervisorImpl -> CleanUpOldBlocks 。這里有個和ReceiverTracker通信的rpc,因此直接搜索CleanUpOldBlocks -> ReceiverTracker -> JobGenerator?
在JobGenerator.clearCheckpointData 中有這么一段邏輯
/** Clear DStream checkpoint data for the given `time`. */private def clearCheckpointData(time: Time) {ssc.graph.clearCheckpointData(time)// All the checkpoint information about which batches have been processed, etc have// been saved to checkpoints, so its safe to delete block metadata and data WAL filesval maxRememberDuration = graph.getMaxInputStreamRememberDuration()jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)jobScheduler.inputInfoTracker.cleanup(time - maxRememberDuration)markBatchFullyProcessed(time)}發(fā)現(xiàn)了 ssc.graph有個 maxRememberDuration 的成員屬性!這就意味著有機(jī)會通過ssc去修改它。
搜索一下代碼便發(fā)現(xiàn)了相關(guān)方法:
jssc.remember(new Duration(2 * 3600 * 1000));?
反思:
從之前的日志我們發(fā)現(xiàn)默認(rèn)的清除間隔是幾十秒左右,但是在代碼中我們可以發(fā)現(xiàn)這個參數(shù)只能被設(shè)置一次(每次設(shè)置都會檢查當(dāng)前為null才生效,初始值為null)。所以問題來了,這幾十秒在哪里設(shè)置的?代碼一時沒找到,于是項目直接搜索 remember,發(fā)現(xiàn)了在DStream里的初始化代碼(其中slideDuration初始化來自InputDStream)。根據(jù)計算,我們的batchInterval為15s,其他兩個沒有設(shè)置,則checkpointDuration 為15s,rememberDuration為30s。
override def slideDuration: Duration = {if (ssc == null) throw new Exception("ssc is null")if (ssc.graph.batchDuration == null) throw new Exception("batchDuration is null")ssc.graph.batchDuration}/*** Initialize the DStream by setting the "zero" time, based on which* the validity of future times is calculated. This method also recursively initializes* its parent DStreams.*/private[streaming] def initialize(time: Time) {if (zeroTime != null && zeroTime != time) {throw new SparkException("ZeroTime is already initialized to " + zeroTime+ ", cannot initialize it again to " + time)}zeroTime = time// Set the checkpoint interval to be slideDuration or 10 seconds, which ever is largerif (mustCheckpoint && checkpointDuration == null) {checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toIntlogInfo("Checkpoint interval automatically set to " + checkpointDuration)}// Set the minimum value of the rememberDuration if not already setvar minRememberDuration = slideDurationif (checkpointDuration != null && minRememberDuration <= checkpointDuration) {// times 2 just to be sure that the latest checkpoint is not forgotten (#paranoia)minRememberDuration = checkpointDuration * 2}if (rememberDuration == null || rememberDuration < minRememberDuration) {rememberDuration = minRememberDuration}// Initialize the dependencies dependencies.foreach(_.initialize(zeroTime))}?
轉(zhuǎn)載于:https://www.cnblogs.com/lhfcws/p/6605085.html
總結(jié)
以上是生活随笔為你收集整理的SparkStreaming “Could not read data from write ahead log record” 报错分析解决的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 在node.js中建立你的第一个HTTp
- 下一篇: 解决Dreamweaver 8打开时闪退