type CompactionPlanner interface {Plan(lastWrite time.Time) []CompactionGroupPlanLevel(level int) []CompactionGroupPlanOptimize() []CompactionGroupRelease(group []CompactionGroup)FullyCompacted() bool// ForceFull causes the planner to return a full compaction plan the next// time Plan() is called if there are files that could be compacted.ForceFull()SetFileStore(fs *FileStore)
}
type tsmGeneration struct {id int // Generationfiles []FileStat //包含的tsm文件的信息, 并且這個files是按文件名從小到大排序好的parseFileName ParseFileNameFunc //這個函數用來從tsm文件名中解析出Generation和Sequence number
}
func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup {... // Determine the generations from all files on disk. We need to treat// a generation conceptually as a single file even though it may be// split across several files in sequence.// 將相同generation id的tsm文件放在一起// generations -> tsmGenerationsgenerations := c.findGenerations(true)...// 按level把tsmGenerations分組// 這些分完組后groups中的tsmGenerations的level從大到小排列的var currentGen tsmGenerationsvar groups []tsmGenerationsfor i := 0; i < len(generations); i++ {cur := generations[i]// See if this generation is orphan'd which would prevent it from being further// compacted until a final full compactin runs.if i < len(generations)-1 {if cur.level() < generations[i+1].level() {currentGen = append(currentGen, cur)continue}}if len(currentGen) == 0 || currentGen.level() == cur.level() {currentGen = append(currentGen, cur)continue}groups = append(groups, currentGen)currentGen = tsmGenerations{}currentGen = append(currentGen, cur)}if len(currentGen) > 0 {groups = append(groups, currentGen)}// Remove any groups in the wrong level// level是這個函數傳進來的參數,指明要compact哪一level的file,這里作個過濾// cur.level()返回的是這個tmsGeneration中所有fileState中最小的level, 這樣作// 合適嗎?var levelGroups []tsmGenerationsfor _, cur := range groups {if cur.level() == level {levelGroups = append(levelGroups, cur)}}minGenerations := 4if level == 1 {//對于level至少要有8個文件,才會compactminGenerations = 8}//type CompactionGroup []stringvar cGroups []CompactionGroupfor _, group := range levelGroups {// 將每個tsmGenerations中的tsmGeneration按給定大小分堆for _, chunk := range group.chunk(minGenerations) {var cGroup CompactionGroupvar hasTombstones boolfor _, gen := range chunk {if gen.hasTombstones() {hasTombstones = true}for _, file := range gen.files {//cGroup里存需要被分組compact的file.PathcGroup = append(cGroup, file.Path)}}// 如果當前的chunk里的tsmGeneration數不夠minGeneration大小,// 需要用下一個chunk來湊夠這個數// hasTombstones為true, 說明有標記刪除的,需要通過 compact 真正刪除掉if len(chunk) < minGenerations && !hasTombstones {continue}cGroups = append(cGroups, cGroup)}}if !c.acquire(cGroups) {return nil}return cGroups
}
func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {generations := c.findGenerations(true)for _, v := range generations {fmt.Printf("xxx | generations: %vn", v)}c.mu.RLock()forceFull := c.forceFullc.mu.RUnlock()// first check if we should be doing a full compaction because nothing has been written in a long time// fullCompact是有時間間隔的,這里作判斷// 這部分處理fullCompact的情況if forceFull || c.compactFullWriteColdDuration > 0 && time.Since(lastWrite) > c.compactFullWriteColdDuration && len(generations) > 1 {// Reset the full schedule if we planned because of it.if forceFull {c.mu.Lock()c.forceFull = falsec.mu.Unlock()}var tsmFiles []stringvar genCount intfor i, group := range generations {var skip bool// Skip the file if it's over the max size and contains a full block and it does not have any tombstonesif len(generations) > 2 && group.size() > uint64(maxTSMFileSize) &&c.FileStore.BlockCount(group.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock &&!group.hasTombstones() {skip = true}// compressed files.if i < len(generations)-1 {if generations[i+1].level() <= 3 {skip = false}}if skip {continue}for _, f := range group.files {tsmFiles = append(tsmFiles, f.Path)}genCount += 1}sort.Strings(tsmFiles)// Make sure we have more than 1 file and more than 1 generationif len(tsmFiles) <= 1 || genCount <= 1 {return nil}group := []CompactionGroup{tsmFiles}if !c.acquire(group) {return nil}return group}// don't plan if nothing has changed in the filestoreif c.lastPlanCheck.After(c.FileStore.LastModified()) && !generations.hasTombstones() {return nil}c.lastPlanCheck = time.Now()// If there is only one generation, return early to avoid re-compacting the same file// over and over again.if len(generations) <= 1 && !generations.hasTombstones() {return nil}// Need to find the ending point for level 4 files. They will be the oldest files. We scan// each generation in descending break once we see a file less than 4.end := 0start := 0// 因為finGeneratons返回的是按level從大到小排序的// 這里找到level >= 4的截至點for i, g := range generations {if g.level() <= 3 {break}end = i + 1}// As compactions run, the oldest files get bigger. We don't want to re-compact them during// this planning if they are maxed out so skip over any we see.var hasTombstones boolfor i, g := range generations[:end] {if g.hasTombstones() {hasTombstones = true}if hasTombstones {continue}// 下面這部分主要是跳到過大的tsm文件// Skip the file if it's over the max size and contains a full block or the generation is split// over multiple files. In the latter case, that would mean the data in the file spilled over// the 2GB limit.if g.size() > uint64(maxTSMFileSize) &&c.FileStore.BlockCount(g.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock {start = i + 1}// This is an edge case that can happen after multiple compactions run. The files at the beginning// can become larger faster than ones after them. We want to skip those really big ones and just// compact the smaller ones until they are closer in size.if i > 0 {if g.size()*2 < generations[i-1].size() {start = ibreak}}}// step is how may files to compact in a group. We want to clamp it at 4 but also stil// return groups smaller than 4.step := 4if step > end {step = end}// slice off the generations that we'll examinegenerations = generations[start:end]// 下面這些代碼主要就是將generations分堆,也就是最后要將tsm文件分堆,以便并行作compaction// Loop through the generations in groups of size step and see if we can compact all (or// some of them as group)groups := []tsmGenerations{}for i := 0; i < len(generations); i += step {var skipGroup boolstartIndex := ifor j := i; j < i+step && j < len(generations); j++ {gen := generations[j]lvl := gen.level()// Skip compacting this group if there happens to be any lower level files in the// middle. These will get picked up by the level compactors.if lvl <= 3 {fmt.Printf("xxx | lvl <= 3")skipGroup = truebreak}// Skip the file if it's over the max size and it contains a full blockif gen.size() >= uint64(maxTSMFileSize) && c.FileStore.BlockCount(gen.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock && !gen.hasTombstones() {startIndex++continue}}if skipGroup {continue}endIndex := i + stepif endIndex > len(generations) {endIndex = len(generations)}if endIndex-startIndex > 0 {groups = append(groups, generations[startIndex:endIndex])}}if len(groups) == 0 {return nil}// With the groups, we need to evaluate whether the group as a whole can be compactedcompactable := []tsmGenerations{}for _, group := range groups {//if we don't have enough generations to compact, skip itif len(group) < 4 && !group.hasTombstones() {continue}compactable = append(compactable, group)}// All the files to be compacted must be compacted in order. We need to convert each// group to the actual set of files in that group to be compacted.var tsmFiles []CompactionGroupfor _, c := range compactable {var cGroup CompactionGroupfor _, group := range c {for _, f := range group.files {cGroup = append(cGroup, f.Path)}}sort.Strings(cGroup)tsmFiles = append(tsmFiles, cGroup)}if !c.acquire(tsmFiles) {return nil}return tsmFiles
}
// 將Cache的內容寫入到 *.tsm.tmp文件中
// cache中value過多的話,會將cache作split成多個cache,并行處理,每個splited cache有自己的generation
func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) {c.mu.RLock()enabled := c.snapshotsEnabledintC := c.snapshotsInterruptc.mu.RUnlock()if !enabled {return nil, errSnapshotsDisabled}start := time.Now()// cache.Count() 返回cache的所有的 value的個數card := cache.Count()// Enable throttling if we have lower cardinality or snapshots are going fast.// 3e6 = 3 x 10的6次方// compaction過程是否要作流控throttle := card < 3e6 && c.snapshotLatencies.avg() < 15*time.Second// Write snapshost concurrently if cardinality is relatively high.concurrency := card / 2e6if concurrency < 1 {concurrency = 1}// Special case very high cardinality, use max concurrency and don't throttle writes.if card >= 3e6 {concurrency = 4throttle = false}splits := cache.Split(concurrency)type res struct {files []stringerr error}resC := make(chan res, concurrency)for i := 0; i < concurrency; i++ {go func(sp *Cache) {iter := NewCacheKeyIterator(sp, tsdb.DefaultMaxPointsPerBlock, intC)files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, nil, iter, throttle)resC <- res{files: files, err: err}}(splits[i])}var err errorfiles := make([]string, 0, concurrency)for i := 0; i < concurrency; i++ {result := <-resCif result.err != nil {err = result.err}files = append(files, result.files...)}... return files, err
}
遍歷keyIterator,將編碼后的block寫入到tsm文件 writeNewFiles
主要就是調用tsmWriter的方法寫入文件
寫入文件時先寫具體的block, 再寫索引
文件的大小或block數達到上限時,切下一個文件
func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter KeyIterator, throttle bool) ([]string, error) {// These are the new TSM files writtenvar files []stringfor {// sequence + 1, 這個sequence其實就是 levelsequence++// 這里寫入的文件的命名為 *.tsm.tmp// 它在作fullCompact時被重命名為 *.tsmfileName := filepath.Join(c.Dir, c.formatFileName(generation, sequence)+"."+TSMFileExtension+"."+TmpTSMFileExtension)// Write as much as possible to this file// c.write實現了實際的寫入操作err := c.write(fileName, iter, throttle)// We've hit the max file limit and there is more to write. Create a new file// and continue.// 寫入的文件大小或block數達到上限,就切下一個文件,sequence + 1if err == errMaxFileExceeded || err == ErrMaxBlocksExceeded {files = append(files, fileName)continue} else if err == ErrNoValues {// ErrNoValues意味著沒有有效的value, 只有tombstoned entires, 就不寫入文件// If the file only contained tombstoned entries, then it would be a 0 length// file that we can drop.if err := os.RemoveAll(fileName); err != nil {return nil, err}break} else if _, ok := err.(errCompactionInProgress); ok {// Don't clean up the file as another compaction is using it. This should not happen as the// planner keeps track of which files are assigned to compaction plans now.return nil, err} else if err != nil {// Remove any tmp files we already completedfor _, f := range files {if err := os.RemoveAll(f); err != nil {return nil, err}}// We hit an error and didn't finish the compaction. Remove the temp file and abort.if err := os.RemoveAll(fileName); err != nil {return nil, err}return nil, err}files = append(files, fileName)break}return files, nil
}
func (k *tsmBatchKeyIterator) Next() bool {
RETRY:// Any merged blocks pending?if len(k.merged) > 0 {k.merged = k.merged[1:]if len(k.merged) > 0 {return true}}// Any merged values pending?if k.hasMergedValues() {k.merge()if len(k.merged) > 0 || k.hasMergedValues() {return true}}// If we still have blocks from the last read, merge themif len(k.blocks) > 0 {k.merge()if len(k.merged) > 0 || k.hasMergedValues() {return true}}// Read the next block from each TSM iterator// 讀每一個tsm文件,將其第一組block都存到k.buf里,看起來是要合并排序// 每個tsm文件對應一個blocks// 這個blocks和tsm的index是一樣的,是按key從小到大排序的for i, v := range k.buf {if len(v) != 0 {continue}iter := k.iterators[i]if iter.Next() {key, minTime, maxTime, typ, _, b, err := iter.Read()if err != nil {k.err = err}// This block may have ranges of time removed from it that would// reduce the block min and max time.// 這個tombstones是[]TimeRangetombstones := iter.r.TombstoneRange(key)var blk *block// k.buf[i]的類型是[]blocks -> [][]block// 下面這段邏輯,就是不斷向k.buf[i]中append新的bolck// 如果k.buf[i]需要擴容,就在append時擴,擴為原有cap的二倍if cap(k.buf[i]) > len(k.buf[i]) {k.buf[i] = k.buf[i][:len(k.buf[i])+1]blk = k.buf[i][len(k.buf[i])-1]if blk == nil {blk = &block{}k.buf[i][len(k.buf[i])-1] = blk}} else {blk = &block{}k.buf[i] = append(k.buf[i], blk)}blk.minTime = minTimeblk.maxTime = maxTimeblk.key = keyblk.typ = typblk.b = bblk.tombstones = tombstonesblk.readMin = math.MaxInt64blk.readMax = math.MinInt64blockKey := key// 如果這兩個key相等,說明還沒有遍歷完當前的blockfor bytes.Equal(iter.PeekNext(), blockKey) {iter.Next()key, minTime, maxTime, typ, _, b, err := iter.Read()if err != nil {k.err = err}tombstones := iter.r.TombstoneRange(key)var blk *blockif cap(k.buf[i]) > len(k.buf[i]) {k.buf[i] = k.buf[i][:len(k.buf[i])+1]blk = k.buf[i][len(k.buf[i])-1]if blk == nil {blk = &block{}k.buf[i][len(k.buf[i])-1] = blk}} else {blk = &block{}k.buf[i] = append(k.buf[i], blk)}blk.minTime = minTimeblk.maxTime = maxTimeblk.key = keyblk.typ = typblk.b = bblk.tombstones = tombstonesblk.readMin = math.MaxInt64blk.readMax = math.MinInt64}}if iter.Err() != nil {k.err = iter.Err()}}// Each reader could have a different key that it's currently at, need to find// the next smallest one to keep the sort ordering.// 找出當前最小的key(series key + field)// 因為k.buf中的每個blocks都是按key從小到大排好的,// 所以這里只需看每個blocks[0]var minKey []bytevar minType bytefor _, b := range k.buf {// block could be nil if the iterator has been exhausted for that fileif len(b) == 0 {continue}if len(minKey) == 0 || bytes.Compare(b[0].key, minKey) < 0 {minKey = b[0].keyminType = b[0].typ}}k.key = minKeyk.typ = minType// Now we need to find all blocks that match the min key so we can combine and dedupe// the blocks if necessary// 把key都等于上面獲取的minKey的block放到k.blocks中for i, b := range k.buf {if len(b) == 0 {continue}//b[0]即為當前的k.buf[i][0], 是一個block// b是[]blockif bytes.Equal(b[0].key, k.key) {//k.blocks => []block// b => []blockk.blocks = append(k.blocks, b...)//k.buf[i]的length被reset為0, 即已有的數據被清掉k.buf[i] = k.buf[i][:0]}}if len(k.blocks) == 0 {return false}k.merge()// After merging all the values for this key, we might not have any. (e.g. they were all deleted// through many tombstones). In this case, move on to the next key instead of ending iteration.if len(k.merged) == 0 {goto RETRY}return len(k.merged) > 0
}
func (k *tsmBatchKeyIterator) combineFloat(dedup bool) blocks {if dedup {//實現了按minTime來排序,去重for k.mergedFloatValues.Len() < k.size && len(k.blocks) > 0 {// 去除已經讀取過的blockfor len(k.blocks) > 0 && k.blocks[0].read() {k.blocks = k.blocks[1:]}if len(k.blocks) == 0 {break}first := k.blocks[0]minTime := first.minTimemaxTime := first.maxTime// Adjust the min time to the start of any overlapping blocks.// 其實i可以從1開始// 為了按minTime排序,需要確定一個全局最小范圍的[minTime, maxTime]for i := 0; i < len(k.blocks); i++ {if k.blocks[i].overlapsTimeRange(minTime, maxTime) && !k.blocks[i].read() {if k.blocks[i].minTime < minTime {minTime = k.blocks[i].minTime}// 將最大值減小if k.blocks[i].maxTime > minTime && k.blocks[i].maxTime < maxTime {maxTime = k.blocks[i].maxTime}}}// We have some overlapping blocks so decode all, append in order and then dedup// 按上面確定的[minTime, maxTime]在所有的blocks中撈數據for i := 0; i < len(k.blocks); i++ {if !k.blocks[i].overlapsTimeRange(minTime, maxTime) || k.blocks[i].read() {continue}var v tsdb.FloatArrayvar err errorif err = DecodeFloatArrayBlock(k.blocks[i].b, &v); err != nil {k.err = errreturn nil}// Remove values we already readv.Exclude(k.blocks[i].readMin, k.blocks[i].readMax)// Filter out only the values for overlapping block// 這個Include是不是可以不用調用v.Include(minTime, maxTime)if v.Len() > 0 {// Record that we read a subset of the blockk.blocks[i].markRead(v.MinTime(), v.MaxTime())}// Apply each tombstone to the blockfor _, ts := range k.blocks[i].tombstones {v.Exclude(ts.Min, ts.Max)}k.mergedFloatValues.Merge(&v)}}// Since we combined multiple blocks, we could have more values than we should put into// a single block. We need to chunk them up into groups and re-encode them.return k.chunkFloat(nil)}var i intfor i < len(k.blocks) {// skip this block if it's values were already readif k.blocks[i].read() {i++continue}// If we this block is already full, just add it as is// 遇到一個不full的Block就break, 那如果后續還有full的block怎么辦?if BlockCount(k.blocks[i].b) >= k.size {k.merged = append(k.merged, k.blocks[i])} else {break}i++}if k.fast {for i < len(k.blocks) {// skip this block if it's values were already readif k.blocks[i].read() {i++continue}k.merged = append(k.merged, k.blocks[i])i++}}// If we only have 1 blocks left, just append it as is and avoid decoding/recodingif i == len(k.blocks)-1 {if !k.blocks[i].read() {k.merged = append(k.merged, k.blocks[i])}i++}// The remaining blocks can be combined and we know that they do not overlap and// so we can just append each, sort and re-encode.for i < len(k.blocks) && k.mergedFloatValues.Len() < k.size {if k.blocks[i].read() {i++continue}var v tsdb.FloatArrayif err := DecodeFloatArrayBlock(k.blocks[i].b, &v); err != nil {k.err = errreturn nil}// Apply each tombstone to the blockfor _, ts := range k.blocks[i].tombstones {v.Exclude(ts.Min, ts.Max)}k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime)k.mergedFloatValues.Merge(&v)i++}k.blocks = k.blocks[i:]return k.chunkFloat(k.merged)
}