influxdb InfluxDB 是一個開源分布式時序、事件和指標數據庫。使用 Go 語言編寫,無需外部依賴。其設計目標是實現分布式和水平伸縮擴展。
influxDB啟動流程:
?1? 用docker下拉influxdb的鏡像
docker pull tutum/influxdb docekr
2 Docker環境下運行influxdb
docker run -d -p 8083:8083 -p8086:8086 --expose 8090 --expose 8099 --name influxsrv tutum/influxdb
各個參數含義:
-d:容器在后臺運行
-p:將容器內端口映射到宿主機端口,格式為 宿主機端口:容器內端口;
8083是influxdb的web管理工具端口
8086是influxdb的HTTP API端口
--expose:可以讓容器接受外部傳入的數據
--name:容器名稱 最后是鏡像名稱+tag,鏡像為tutum/influxdb,tag的值0.8.8指定了要運行的版本,默認是latest。
3 啟動influxdb后,influxdb會啟動一個內部的HTTP server管理工具,用戶可以通過接入該web服務器來操作influxdb。
當然,也可以通過CLI即命令行的方式訪問influxdb。
打開瀏覽器,輸入http://127.0.0.1:8083,訪問管理工具的主頁
4 Influxdb客戶端 可以參考里面例子
https://github.com/influxdata/influxdb/tree/master/client
PS. Influxdb原理詳解
https://www.linuxdaxue.com/influxdb-principle.html
Grafana Grafana 是一個開源的時序性統計和監控平臺,支持例如 elasticsearch、graphite、influxdb 等眾多的數據源,并以功能強大的界面編輯器著稱。
官網:https://grafana.com/
grafana啟動流程:
1 docker 拉取鏡像
docker run -d --name=grafana -p 3000:3000 grafana/grafana
2 訪問管理工具的主頁
瀏覽器127.0.0.1:3000 ,? 登錄 grafana的默認端口是3000,用戶名和密碼為 admin / admin,配置文件/etc/grafana/grafana.ini,更改配置文件后需要重啟grafana。
3. 創建數據庫,綁定influxdb
4. 創建一個新的面板
home —> New Dashboard —> Graph —> 點擊,Edit
5 Edit中的Metrics就是構造一個SQL的查詢語句
Golang打點 監控日志程序通過 influxdb 將需要的內容打點到influxdb?
1.導入 github.com/influxdata/influxdb/client/v2
2.創建influxdb client
// Create a new HTTPClientc, err := client.NewHTTPClient(client.HTTPConfig{Addr: addr,Username: username,Password: password,})
if err != nil {log.Fatal(err)}defer c.Close()
復制代碼
3.創建需要打的點的格式,類型
// Create a new point batchbp, err := client.NewBatchPoints(client.BatchPointsConfig{Database: database,Precision: precision,})
if err != nil {log.Fatal(err)}
復制代碼
4.創建點,將點添加進influxdb數據庫
// Create a point and add to batch//Tags:Path,Method,Scheme,Statustags := map[string]string{
"Path" : v.Path,
"Method" : v.Method,
"Scheme" : v.Scheme,
"Status" : v.Status,}fields := map[string]interface{}{
"UpstreamTime" : v.UpstreamTime,
"RequestTime" : v.RequestTime,
"BytesSent" : v.BytesSent,}pt, err := client.NewPoint(
"nginx_log" , tags, fields, v.TimeLocal)
if err != nil {log.Fatal(err)}bp.AddPoint(pt)// Write the batch
if err := c.Write(bp); err != nil {log.Fatal(err)}
復制代碼 Golang 完整代碼 imooc.log日志格式如下:
172.0.0.12 - - [02/May/2018:17:17:35 +0000] http "GET /foo?query=t HTTP/1.0" 200 2133 "-" "KeepAliveClient" "-" 1.005 1.854
172.0.0.12 - - [02/May/2018:17:17:36 +0000] http "POST /bar?query=t HTTP/1.0" 300 2133 "-" "KeepAliveClient" "-" 1.025 1.854
代碼邏輯主要是? 通過讀取模塊讀取imooc.log日志文件中日志,然后通過正則表達式,一行一行解析獲取數據,并通過寫入模塊將數據通過influxdb客戶端打點,最后通過grafana去顯示數據圖形.
package mainimport (
"bufio" "fmt" "github.com/influxdata/influxdb/client/v2" "io" "net/url" "os" "regexp" "strconv" "strings" "time" "flag" "log" "net/http" "encoding/json"
)const (TypeHandleLine = 0TypeErrNum = 1TpsIntervalTime = 5
)var TypeMonitorChan = make(chan int,200)
type Message struct {TimeLocal time.TimeBytesSent intPath, Method, Scheme, Status stringUpstreamTime, RequestTime
float 64
}//系統狀態監控
type SystemInfo struct {HandleLine int `json:
"handleLine" ` //總處理日志行數Tps
float 64 `json:
"tps" ` //系統吞吐量ReadChanLen int `json:
"readChanLen" ` //
read channel 長度WriterChanLen int `json:
"writeChanLen" ` //write channel 長度RunTime string `json:
"ruanTime" ` //運行總時間ErrNum int `json:
"errNum" ` //錯誤數
}
type Monitor struct {startTime time.Timedata SystemInfotpsSli []inttps
float 64
}func (m *Monitor)start(lp *LogProcess) {go
func () {
for n := range TypeMonitorChan {switch n {
case TypeErrNum:m.data.ErrNum += 1
case TypeHandleLine:m.data.HandleLine += 1}}}()ticker := time.NewTicker(time.Second *TpsIntervalTime)go
func () {
for {<-ticker.Cm.tpsSli = append(m.tpsSli,m.data.HandleLine)
if len(m.tpsSli) > 2 {m.tpsSli = m.tpsSli[1:]m.tps =
float 64(m.tpsSli[1] - m.tpsSli[0])/TpsIntervalTime}}}()http.HandleFunc(
"/monitor" , func(writer http.ResponseWriter, request *http.Request) {m.data.RunTime = time.Now().Sub(m.startTime).String()m.data.ReadChanLen = len(lp.rc)m.data.WriterChanLen = len(lp.wc)m.data.Tps = m.tpsret ,_ := json.MarshalIndent(m.data,
"" ,
"\t" )io.WriteString(writer,string(ret))})http.ListenAndServe(
":9193" ,nil)
}
type Reader interface {Read(rc chan []byte)
}
type Writer interface {Writer(wc chan *Message)
}
type LogProcess struct {rc chan []bytewc chan *Message
read Readerwrite Writer
}
type ReadFromFile struct {path string //讀取文件的路徑
}//讀取模塊
func (r *ReadFromFile) Read(rc chan []byte) {//打開文件f, err := os.Open(r.path)fmt.Println(r.path)
if err != nil {panic(fmt.S
printf (
"open file err :" , err.Error()))}//從文件末尾開始逐行讀取文件內容f.Seek(0, 2) //2,代表將指正移動到末尾rd := bufio.NewReader(f)
for {line, err := rd.ReadBytes(
'\n' ) //連續讀取內容知道需要
'\n' 結束
if err == io.EOF {time.Sleep(5000 * time.Microsecond)
continue }
else if err != nil {panic(fmt.S
printf (
"ReadBytes err :" , err.Error()))}TypeMonitorChan <- TypeHandleLinerc <- line[:len(line)-1]}}
type WriteToinfluxDB struct {influxDBDsn string //influx data
source
}//寫入模塊
/**1.初始化influxdb client2. 從Write Channel中讀取監控數據3. 構造數據并寫入influxdb
*/
func (w *WriteToinfluxDB) Writer(wc chan *Message) {infSli := strings.Split(w.influxDBDsn,
"@" )addr := infSli[0]username := infSli[1]password := infSli[2]database := infSli[3]precision := infSli[4]// Create a new HTTPClientc, err := client.NewHTTPClient(client.HTTPConfig{Addr: addr,Username: username,Password: password,})
if err != nil {log.Fatal(err)}defer c.Close()
for v := range wc {// Create a new point batchbp, err := client.NewBatchPoints(client.BatchPointsConfig{Database: database,Precision: precision,})
if err != nil {log.Fatal(err)}// Create a point and add to batch//Tags:Path,Method,Scheme,Statustags := map[string]string{
"Path" : v.Path,
"Method" : v.Method,
"Scheme" : v.Scheme,
"Status" : v.Status,}fields := map[string]interface{}{
"UpstreamTime" : v.UpstreamTime,
"RequestTime" : v.RequestTime,
"BytesSent" : v.BytesSent,}fmt.Println(
"taps:" ,tags)fmt.Println(
"fields:" ,fields)pt, err := client.NewPoint(
"nginx_log" , tags, fields, v.TimeLocal)
if err != nil {log.Fatal(err)}bp.AddPoint(pt)// Write the batch
if err := c.Write(bp); err != nil {log.Fatal(err)}// Close client resources
if err := c.Close(); err != nil {log.Fatal(err)}log.Println(
"write success" )}}//解析模塊
func (l *LogProcess)
Process () {/**172.0.012 - - [04/Mar/2018:13:49:52 +0000] http
"GET /foo?query=t HTTP/1.0" 200 2133
"-" "KeepAliveClient" "-" 1.005 1.854([\d\.]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+([a-z]+)\s+\
"([^" ]+)\
"\s+(\d{3})\s+(\d+)\s+\"([^" ]+)\
"\s+\"(.*?)\"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([\d\.-]+)*/r := regexp.MustCompile(`([\d\.]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+([a-z]+)\s+\"([^" ]+)\
"\s+(\d{3})\s+(\d+)\s+\"([^" ]+)\
"\s+\"(.*?)\"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([\d\.-]+)`)for v := range l.rc {ret := r.FindStringSubmatch(string(v))if len(ret) != 14 {TypeMonitorChan <- TypeErrNumfmt.Println(" FindStringSubmatch fail:
", string(v))fmt.Println(len(ret))continue}message := &Message{}//時間: [04/Mar/2018:13:49:52 +0000]loc, _ := time.LoadLocation(" Asia/Shanghai
")t, err := time.ParseInLocation(" 02/Jan/2006:15:04:05 +0000
", ret[4], loc)if err != nil {TypeMonitorChan <- TypeErrNumfmt.Println(" ParseInLocation fail:
", err.Error(), ret[4])}message.TimeLocal = t//字符串長度: 2133byteSent, _ := strconv.Atoi(ret[8])message.BytesSent = byteSent//" GET /foo?query=t HTTP/1.0
"reqSli := strings.Split(ret[6], " ")if len(reqSli) != 3 {TypeMonitorChan <- TypeErrNumfmt.Println(" strings.Split fail:
", ret[6])continue}message.Method = reqSli[0]u, err := url.Parse(reqSli[1])if err != nil {TypeMonitorChan <- TypeErrNumfmt.Println(" url parse fail:
", err)continue}message.Path = u.Path//httpmessage.Scheme = ret[5]//code: 200message.Status = ret[7]//1.005upstreamTime, _ := strconv.ParseFloat(ret[12], 64)message.UpstreamTime = upstreamTime//1.854requestTime, _ := strconv.ParseFloat(ret[13], 64)message.RequestTime = requestTime//fmt.Println(message)l.wc <- message}
}/**
分析監控需求:某個協議下的某個請求在某個請求方法的 QPS&響應時間&流量*/
func main() {var path, influDsn stringflag.StringVar(&path, " path
", " ./imooc.log
", " read file path
")flag.StringVar(&influDsn, " influxDsn
", " http://127.0.01:8086@imooc@imoocpass@imooc@s
", " influx data
source ")flag.Parse()r := &ReadFromFile{path: path,}w := &WriteToinfluxDB{influxDBDsn: influDsn,}lp := &LogProcess{rc: make(chan []byte,200),wc: make(chan *Message),read: r,write: w,}go lp.read.Read(lp.rc)for i:=1;i<2 ; i++ {go lp.Process()}for i:=1;i<4 ; i++ {go lp.write.Writer(lp.wc)}fmt.Println(" begin !!!
")m:= &Monitor{startTime:time.Now(),data:SystemInfo{},}m.start(lp)
}
復制代碼
總結
以上是生活随笔 為你收集整理的influxDB+grafana 日志监控平台(Golang) 的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網站內容還不錯,歡迎將生活随笔 推薦給好友。