Prometheus 实战于源码分析之storage
prometheus不僅支持本地存儲還支持遠端存儲,先從遠端存儲說起,他是通過一個發送隊列queue完成數據發送的。先看一下隊列的定義:
func NewQueueManager(cfg QueueManagerConfig) *QueueManager {if cfg.QueueCapacity == 0 {cfg.QueueCapacity = defaultQueueCapacity}if cfg.MaxShards == 0 {cfg.MaxShards = defaultMaxShards}if cfg.MaxSamplesPerSend == 0 {cfg.MaxSamplesPerSend = defaultMaxSamplesPerSend}if cfg.BatchSendDeadline == 0 {cfg.BatchSendDeadline = defaultBatchSendDeadline}t := &QueueManager{cfg: cfg,queueName: cfg.Client.Name(),logLimiter: rate.NewLimiter(logRateLimit, logBurst),numShards: 1,reshardChan: make(chan int),quit: make(chan struct{}),samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration),samplesOut: newEWMARate(ewmaWeight, shardUpdateDuration),samplesOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration),}t.shards = t.newShards(t.numShards)numShards.WithLabelValues(t.queueName).Set(float64(t.numShards))queueCapacity.WithLabelValues(t.queueName).Set(float64(t.cfg.QueueCapacity))return t }這個隊列的最大分片是1000,每個分片沒秒1000個sample,那么一秒就可以發送1000*1000個sample。每一種存儲,無論是本地存儲還有遠端存儲,寫數據都實現Append方法,remote的也一樣,在romte的Append就調用了queue的Append方法。
func (t *QueueManager) Append(s *model.Sample) error {var snew model.Samplesnew = *ssnew.Metric = s.Metric.Clone()for ln, lv := range t.cfg.ExternalLabels {if _, ok := s.Metric[ln]; !ok {snew.Metric[ln] = lv}}snew.Metric = model.Metric(relabel.Process(model.LabelSet(snew.Metric), t.cfg.RelabelConfigs...))if snew.Metric == nil {return nil}t.shardsMtx.Lock()enqueued := t.shards.enqueue(&snew)t.shardsMtx.Unlock()if enqueued {queueLength.WithLabelValues(t.queueName).Inc()} else {droppedSamplesTotal.WithLabelValues(t.queueName).Inc()if t.logLimiter.Allow() {log.Warn("Remote storage queue full, discarding sample. Multiple subsequent messages of this kind may be suppressed.")}}return nil }通過enqueued := t.shards.enqueue(&snew)發到隊列里面,
func (s *shards) enqueue(sample *model.Sample) bool {s.qm.samplesIn.incr(1)fp := sample.Metric.FastFingerprint()shard := uint64(fp) % uint64(len(s.queues))select {case s.queues[shard] <- sample:return truedefault:return false} }這個里面是簡單的求余數分組的方法,如果這里使用一致hash會不會更好點呢!把數據發動到分片的隊列中。QueueManager啟動的時候就啟動了隊列發送任務
func (s *shards) start() {for i := 0; i < len(s.queues); i++ {go s.runShard(i)} }繼續看runShard
func (s *shards) runShard(i int) {defer s.wg.Done()queue := s.queues[i]// Send batches of at most MaxSamplesPerSend samples to the remote storage.// If we have fewer samples than that, flush them out after a deadline// anyways.pendingSamples := model.Samples{}for {select {case sample, ok := <-queue:if !ok {if len(pendingSamples) > 0 {log.Debugf("Flushing %d samples to remote storage...", len(pendingSamples))s.sendSamples(pendingSamples)log.Debugf("Done flushing.")}return}queueLength.WithLabelValues(s.qm.queueName).Dec()pendingSamples = append(pendingSamples, sample)for len(pendingSamples) >= s.qm.cfg.MaxSamplesPerSend {s.sendSamples(pendingSamples[:s.qm.cfg.MaxSamplesPerSend])pendingSamples = pendingSamples[s.qm.cfg.MaxSamplesPerSend:]}case <-time.After(s.qm.cfg.BatchSendDeadline):if len(pendingSamples) > 0 {s.sendSamples(pendingSamples)pendingSamples = pendingSamples[:0]}}} }具體發送樣本的方法還要看里面的sendSamples
func (s *shards) sendSamples(samples model.Samples) {// Samples are sent to the remote storage on a best-effort basis. If a// sample isn't sent correctly the first time, it's simply dropped on the// floor.begin := time.Now()err := s.qm.cfg.Client.Store(samples)duration := time.Since(begin)if err != nil {log.Warnf("error sending %d samples to remote storage: %s", len(samples), err)failedSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples)))} else {sentSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples)))}sentBatchDuration.WithLabelValues(s.qm.queueName).Observe(duration.Seconds())s.qm.samplesOut.incr(int64(len(samples)))s.qm.samplesOutDuration.incr(int64(duration)) }最終通過Store方法發送數據
func (c *Client) Store(samples model.Samples) error {req := &WriteRequest{Timeseries: make([]*TimeSeries, 0, len(samples)),}for _, s := range samples {ts := &TimeSeries{Labels: make([]*LabelPair, 0, len(s.Metric)),}for k, v := range s.Metric {ts.Labels = append(ts.Labels,&LabelPair{Name: string(k),Value: string(v),})}ts.Samples = []*Sample{{Value: float64(s.Value),TimestampMs: int64(s.Timestamp),},}req.Timeseries = append(req.Timeseries, ts)}data, err := proto.Marshal(req)if err != nil {return err}buf := bytes.Buffer{}if _, err := snappy.NewWriter(&buf).Write(data); err != nil {return err}httpReq, err := http.NewRequest("POST", c.url.String(), &buf)if err != nil {return err}httpReq.Header.Add("Content-Encoding", "snappy")ctx, _ := context.WithTimeout(context.Background(), c.timeout)httpResp, err := ctxhttp.Do(ctx, c.client, httpReq)if err != nil {return err}defer httpResp.Body.Close()if httpResp.StatusCode/100 != 2 {return fmt.Errorf("server returned HTTP status %s", httpResp.Status)}return nil }Store里面就是通過POST方式發送數據。說完了遠端存儲,再解釋一下本地存儲,這個設計的挺復雜,它是先放到內存中,并會批量將內存數據導入到磁盤中保存,具體看內存存儲管理
type MemorySeriesStorage struct {// archiveHighWatermark and numChunksToPersist have to be aligned for atomic operations.archiveHighWatermark model.Time // No archived series has samples after this time.numChunksToPersist int64 // The number of chunks waiting for persistence.maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will be throttled.rushed bool // Whether the storage is in rushed mode.rushedMtx sync.Mutex // Protects entering and exiting rushed mode.throttled chan struct{} // This chan is sent to whenever NeedsThrottling() returns true (for logging).fpLocker *fingerprintLockerfpToSeries *seriesMapoptions *MemorySeriesStorageOptionsloopStopping, loopStopped chan struct{}logThrottlingStopped chan struct{}maxMemoryChunks intdropAfter time.DurationcheckpointInterval time.DurationcheckpointDirtySeriesLimit intpersistence *persistencemapper *fpMapperevictList *list.ListevictRequests chan chunk.EvictRequestevictStopping, evictStopped chan struct{}quarantineRequests chan quarantineRequestquarantineStopping, quarantineStopped chan struct{}persistErrors prometheus.CounterqueuedChunksToPersist prometheus.CounternumSeries prometheus.GaugenumHeadChunks prometheus.GaugedirtySeries prometheus.GaugeseriesOps *prometheus.CounterVecingestedSamplesCount prometheus.CounterdiscardedSamplesCount *prometheus.CounterVecnonExistentSeriesMatchesCount prometheus.CountermaintainSeriesDuration *prometheus.SummaryVecpersistenceUrgencyScore prometheus.GaugerushedMode prometheus.Gauge }他是一個內存存儲管理器。和remote一樣,他也是實現了Append方法去保存sample。
func (s *MemorySeriesStorage) Append(sample *model.Sample) error {for ln, lv := range sample.Metric {if len(lv) == 0 {delete(sample.Metric, ln)}}rawFP := sample.Metric.FastFingerprint()s.fpLocker.Lock(rawFP)fp := s.mapper.mapFP(rawFP, sample.Metric)defer func() {s.fpLocker.Unlock(fp)}() // Func wrapper because fp might change below.if fp != rawFP {// Switch locks.s.fpLocker.Unlock(rawFP)s.fpLocker.Lock(fp)}series, err := s.getOrCreateSeries(fp, sample.Metric)if err != nil {return err // getOrCreateSeries took care of quarantining already.}if sample.Timestamp == series.lastTime {// Don't report "no-op appends", i.e. where timestamp and sample// value are the same as for the last append, as they are a// common occurrence when using client-side timestamps// (e.g. Pushgateway or federation).if sample.Timestamp == series.lastTime &&series.lastSampleValueSet &&sample.Value.Equal(series.lastSampleValue) {return nil}s.discardedSamplesCount.WithLabelValues(duplicateSample).Inc()return ErrDuplicateSampleForTimestamp // Caused by the caller.}if sample.Timestamp < series.lastTime {s.discardedSamplesCount.WithLabelValues(outOfOrderTimestamp).Inc()return ErrOutOfOrderSample // Caused by the caller.}completedChunksCount, err := series.add(model.SamplePair{Value: sample.Value,Timestamp: sample.Timestamp,})if err != nil {s.quarantineSeries(fp, sample.Metric, err)return err}s.ingestedSamplesCount.Inc()s.incNumChunksToPersist(completedChunksCount)return nil }這個里面先通過getOrCreateSeries獲取series,series你可以理解為,相同類型的監控數據放到一起,這樣便于壓縮查找,通過series.add保存。但這只是保存到內存中,怎么持久化呢?
在MemorySeriesStorage啟動的時候
這個persistence負責把內存中的數據寫到磁盤中,loop中
for {select {case <-s.loopStopping:break loopcase fp := <-memoryFingerprints:if s.maintainMemorySeries(fp, model.Now().Add(-s.dropAfter)) {dirty := atomic.AddInt64(&dirtySeriesCount, 1)s.dirtySeries.Set(float64(dirty))// Check if we have enough "dirty" series so that we need an early checkpoint.// However, if we are already behind persisting chunks, creating a checkpoint// would be counterproductive, as it would slow down chunk persisting even more,// while in a situation like that, where we are clearly lacking speed of disk// maintenance, the best we can do for crash recovery is to persist chunks as// quickly as possible. So only checkpoint if the urgency score is < 1.if dirty >= int64(s.checkpointDirtySeriesLimit) &&s.calculatePersistenceUrgencyScore() < 1 {checkpointTimer.Reset(0)}}case fp := <-archivedFingerprints:s.maintainArchivedSeries(fp, model.Now().Add(-s.dropAfter))}}maintainMemorySeries保存series,
func (s *MemorySeriesStorage) maintainMemorySeries(fp model.Fingerprint, beforeTime model.Time, ) (becameDirty bool) {defer func(begin time.Time) {s.maintainSeriesDuration.WithLabelValues(maintainInMemory).Observe(time.Since(begin).Seconds(),)}(time.Now())s.fpLocker.Lock(fp)defer s.fpLocker.Unlock(fp)series, ok := s.fpToSeries.get(fp)if !ok {// Series is actually not in memory, perhaps archived or dropped in the meantime.return false}defer s.seriesOps.WithLabelValues(memoryMaintenance).Inc()closed, err := series.maybeCloseHeadChunk()if err != nil {s.quarantineSeries(fp, series.metric, err)s.persistErrors.Inc()}if closed {s.incNumChunksToPersist(1)s.numHeadChunks.Dec()}seriesWasDirty := series.dirtyif s.writeMemorySeries(fp, series, beforeTime) {// Series is gone now, we are done.return false}iOldestNotEvicted := -1for i, cd := range series.chunkDescs {if !cd.IsEvicted() {iOldestNotEvicted = ibreak}}// Archive if all chunks are evicted. Also make sure the last sample has// an age of at least headChunkTimeout (which is very likely anyway).if iOldestNotEvicted == -1 && model.Now().Sub(series.lastTime) > headChunkTimeout {s.fpToSeries.del(fp)s.numSeries.Dec()s.persistence.archiveMetric(fp, series.metric, series.firstTime(), series.lastTime)s.seriesOps.WithLabelValues(archive).Inc()oldWatermark := atomic.LoadInt64((*int64)(&s.archiveHighWatermark))if oldWatermark < int64(series.lastTime) {if !atomic.CompareAndSwapInt64((*int64)(&s.archiveHighWatermark),oldWatermark, int64(series.lastTime),) {panic("s.archiveHighWatermark modified outside of maintainMemorySeries")}}return}// If we are here, the series is not archived, so check for Chunk.Desc// eviction next.series.evictChunkDescs(iOldestNotEvicted)return series.dirty && !seriesWasDirty }writeMemorySeries把數據寫到磁盤,里面再調用persistChunks
func (p *persistence) persistChunks(fp model.Fingerprint, chunks []chunk.Chunk) (index int, err error) {f, err := p.openChunkFileForWriting(fp)if err != nil {return -1, err}defer p.closeChunkFile(f)if err := p.writeChunks(f, chunks); err != nil {return -1, err}// Determine index within the file.offset, err := f.Seek(0, os.SEEK_CUR)if err != nil {return -1, err}index, err = chunkIndexForOffset(offset)if err != nil {return -1, err}return index - len(chunks), err }那這些series怎么查詢呢?它有個index列表,通過著名的leveldb保存index,這樣就可以通過index去查詢了。他是一個keyvalue數據庫,接口定義storage/local/index/interface.go
type KeyValueStore interface {Put(key, value encoding.BinaryMarshaler) error// Get unmarshals the result into value. It returns false if no entry// could be found for key. If value is nil, Get behaves like Has.Get(key encoding.BinaryMarshaler, value encoding.BinaryUnmarshaler) (bool, error)Has(key encoding.BinaryMarshaler) (bool, error)// Delete returns (false, nil) if key does not exist.Delete(key encoding.BinaryMarshaler) (bool, error)NewBatch() BatchCommit(b Batch) error// ForEach iterates through the complete KeyValueStore and calls the// supplied function for each mapping.ForEach(func(kv KeyValueAccessor) error) errorClose() error }它的實現在storage/local/index/leveldb.go里面,代碼比較多,我就不粘出來了。
總結
以上是生活随笔為你收集整理的Prometheus 实战于源码分析之storage的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Karaf教程第2部分使用Configu
- 下一篇: 技术开发向技术管理转型