mysql webhook_sql 数据定时发送webhook golang 服务
目的很簡單,主要是基于cron表達式定時獲取sql 數據庫數據(sql server,mysql,pg,clickhouse)同時通過webhook 發送到外部接口
幾個需求說明
應該基于配置管理,而不是代碼寫死的
支持多數據庫同時運行(減少運行的實例)
支持sql 數據的處理(對于不用webhook 的數據可能不一樣,我們需要處理下)
job 支持靈活的cron 表達式
應該輕量,簡單,容易使用
設計
簡單說明
參考了一個sql2slack 的服務,基于golang 編寫,使用hcl 進行配置管理,同時基于js 引擎處理數據,同時為了方便數據處理
提供了內置underscore ,對于cron 的處理基于golang 版本的cron表達式引擎 ,一些改進:基于hcl v2 版本,支持多js 引擎
(otto以及goja 基于配置指定),調整webhook 消息的發送,支持發送任意消息,同時調整cron支持秒的處理
job 格式說明
基于hcl ,Name 為label,實際上后邊可以調整下,將job 擴展為webhook以及db 模式的,
type Job struct {
Name ? ? ? ? ? ?string ? ? ? ?`hcl:",label"`
Driver ? ? ? ? ?string ? ? ? ?`hcl:"driver"`
DSN ? ? ? ? ? ? string ? ? ? ?`hcl:"dsn"`
Query ? ? ? ? ? string ? ? ? ?`hcl:"query"`
Webhook ? ? ? ? string ? ? ? ?`hcl:"webhook"`
Schedule ? ? ? ?string ? ? ? ?`hcl:"schedule"`
MessageString ? string ? ? ? ?`hcl:"message"`
MessageCompiled executor.JSVM `hcl:"-"`
Conn ? ? ? ? ? ?*sqlx.DB ? ? ?`hcl:"-"`
EngineName ? ? ?string ? ? ? ?`hcl:"jsengine"`
JSVM ? ? ? ? ? ?string ? ? ? ?`hcl:"-"`
Stmnt ? ? ? ? ? *sqlx.Stmt ? ?`hcl:"-"`
}
參考hcl配置
job tst {
webhook = "http://127.0.0.1:4195"
?
driver = "mysql"
?
dsn = "demo:demo@tcp(127.0.0.1:3306)/demo"
jsengine = "otto"
query = <
SELECT users.* FROM users
SQL
?
schedule = "* * * * * *"
?
message = <
if ( $rows.length < 1 ) {
return
}
log("this is a demo")
var msg = ?"";
_.chain($rows).pluck('name').each(function(name){
msg += name+"--------demo--from otto----";
})
var info = {
msgtype: "text",
text: {
content: msg
}
}
log(JSON.stringify(info))
send(JSON.stringify(info))
JS
}
代碼結構
├── Dockerfile
├── Makefile
├── README.md
├── cmd
│ ? ├── cli
│ ? │ ? ├── Dockerfile
│ ? │ ? ├── Makefile
│ ? │ ? ├── README.md
│ ? │ ? └── main.go
│ ? └── server
│ ? ? ? ├── Dockerfile
│ ? ? ? ├── Makefile
│ ? ? ? ├── README.md
│ ? ? ? └── main.go
├── demo.hcl
├── demo2.hcl
├── docker-compose.yaml
├── go.mod
├── go.sum
├── pkg
│ ? ├── agent
│ ? ├── buildinfo
│ ? │ ? └── version.go
│ ? ├── commands
│ ? │ ? ├── cli.go
│ ? │ ? └── server.go
│ ? ├── executor
│ ? │ ? └── jsengine.go
│ ? ├── jobs
│ ? │ ? └── job.go
│ ? ├── npm
│ ? │ ? └── bindata.go
│ ? ├── storage
│ ? └── webhooks
├── underscore-min.js
└── webhook.yaml
代碼說明
核心是 jsengine.go以及job.go,jsengine.go 包含了js 引擎的處理,job.go 主要是對于hcl 配置的解析以及cron 的處理
job.go
為了方便使用js engine 暴露了log $rows 以及send 發送,可以擴展,同時解析job
package jobs
?
import (
"encoding/json"
"errors"
"fmt"
"log"
"path/filepath"
?
"github.com/dop251/goja"
"github.com/go-resty/resty/v2"
"github.com/hashicorp/hcl/v2/hclsimple"
"github.com/jmoiron/sqlx"
"github.com/robertkrimen/otto"
"github.com/robfig/cron/v3"
"github.com/rongfengliang/sql-server-exporter/pkg/executor"
)
?
// Job is one type for sql data fetch
type Job struct {
Name ? ? ? ? ? ?string ? ? ? ?`hcl:",label"`
Driver ? ? ? ? ?string ? ? ? ?`hcl:"driver"`
DSN ? ? ? ? ? ? string ? ? ? ?`hcl:"dsn"`
Query ? ? ? ? ? string ? ? ? ?`hcl:"query"`
Webhook ? ? ? ? string ? ? ? ?`hcl:"webhook"`
Schedule ? ? ? ?string ? ? ? ?`hcl:"schedule"`
MessageString ? string ? ? ? ?`hcl:"message"`
MessageCompiled executor.JSVM `hcl:"-"`
Conn ? ? ? ? ? ?*sqlx.DB ? ? ?`hcl:"-"`
EngineName ? ? ?string ? ? ? ?`hcl:"jsengine"`
JSVM ? ? ? ? ? ?string ? ? ? ?`hcl:"-"`
Stmnt ? ? ? ? ? *sqlx.Stmt ? ?`hcl:"-"`
}
?
// ParseJobs parseJobs
func ParseJobs(jobsdir string) (map[string]*Job, *cron.Cron, error) {
var cronhub *cron.Cron = cron.New(cron.WithChain(
cron.SkipIfStillRunning(cron.DefaultLogger),
cron.Recover(cron.DefaultLogger),
), cron.WithParser(cron.NewParser(
cron.SecondOptional|cron.Minute|cron.Hour|cron.Dom|cron.Month|cron.Dow|cron.Descriptor,
)))
files, err := filepath.Glob(filepath.Join(jobsdir, "*.hcl"))
if err != nil {
return nil, nil, err
}
?
result := map[string]*Job{}
?
for _, filename := range files {
var fileJobs struct {
Jobs []*Job `hcl:"job,block"`
}
if err != nil {
return nil, nil, err
}
err := hclsimple.DecodeFile(filename, nil, &fileJobs)
if err != nil {
return nil, nil, errors.New("#hcl: " + err.Error())
}
for _, job := range fileJobs.Jobs {
job.MessageCompiled, err = NewJSVM(job.EngineName, job.Name, fmt.Sprintf("(function(){%s})()", job.MessageString))
if err != nil {
return nil, nil, errors.New("#javascript: " + err.Error())
}
?
job.Conn, err = sqlx.Connect(job.Driver, job.DSN)
if err != nil {
return nil, nil, errors.New("#sql:" + job.Name + ": " + err.Error())
}
?
job.Stmnt, err = job.Conn.Preparex(job.Query)
if err != nil {
return nil, nil, errors.New("#sql:" + job.Name + ": " + err.Error())
}
?
if job.Webhook == "" {
return nil, nil, errors.New("#webhook:" + job.Name + ": webhook is required")
}
?
if err := (func(job *Job) error {
_, err := cronhub.AddFunc(job.Schedule, func() {
if err := job.Exec(); err != nil {
panic(err)
}
})
return err
})(job); err != nil {
return nil, nil, errors.New("#cron:" + job.Name + ":" + err.Error())
}
?
result[job.Name] = job
}
}
?
return result, cronhub, nil
}
?
// NewJSVM NewJSVM
func NewJSVM(engine string, name, src string) (executor.JSVM, error) {
var jsjvm executor.JSVM
switch engine {
case "goja":
jsjvm = executor.NewGojaExecutor(src, goja.New())
?
case "otto":
vm := otto.New()
script, err := vm.Compile(name, src)
if err != nil {
return nil, err
}
jsjvm = executor.NewOttoExecutor(src, vm, script)
default:
return nil, errors.New("not supported js engine")
?
}
return jsjvm, nil
}
?
// Exec job
func (j *Job) Exec() error {
rows, err := j.Stmnt.Queryx()
if err != nil {
return err
}
defer rows.Close()
var res []map[string]interface{}
for rows.Next() {
o := map[string]interface{}{}
if err := rows.MapScan(o); err != nil {
return err
}
for k, v := range o {
if nil == v {
continue
}
?
switch v.(type) {
case []uint8:
v = []byte(v.([]uint8))
default:
v, _ = json.Marshal(v)
}
?
var d interface{}
if nil == json.Unmarshal(v.([]byte), &d) {
o[k] = d
} else {
o[k] = string(v.([]byte))
}
}
res = append(res, o)
}
msg := ""
ctx := map[string]interface{}{
"$rows": res,
"log": ? log.Println,
"send": func(in ...interface{}) {
msg += fmt.Sprint(in...) + "\n"
},
}
if err := j.MessageCompiled.Execute(ctx); err != nil {
return err
}
_, err = resty.New().R().SetDoNotParseResponse(true).SetHeader("content-type", "application/json").SetBody(msg).Post(j.Webhook)
return err
}
jsengine.go
js 引擎的處理使用了JSVM 接口,同時實現了otto 以及goja 的擴展,都包含了underscore 庫
package executor
?
import (
"github.com/dop251/goja"
"github.com/dop251/goja_nodejs/require"
"github.com/robertkrimen/otto"
"github.com/rongfengliang/sql-server-exporter/pkg/npm"
)
?
// JSVM js Engine define
type JSVM interface {
// Execute job command
Execute(map[string]interface{}) error
}
?
// GojaExecutor goja js executor engine
type GojaExecutor struct {
Src string
VM ?*goja.Runtime
}
?
// Execute goja execute command
func (goja *GojaExecutor) Execute(context map[string]interface{}) error {
for k, v := range context {
goja.VM.Set(k, v)
}
_, err := goja.VM.RunString(goja.Src)
return err
}
?
// NewGojaExecutor GojaExecutor
func NewGojaExecutor(src string, vm *goja.Runtime) JSVM {
registry := require.NewRegistryWithLoader(func(path string) ([]byte, error) {
return npm.Asset(path)
})
m, _ := registry.Enable(vm).Require("underscore-min.js")
vm.Set("_", m)
return &GojaExecutor{
Src: src,
VM: ?vm,
}
}
?
// OttoExecutor Otto js executor engine
type OttoExecutor struct {
Src ? ?string
VM ? ? *otto.Otto
Script *otto.Script
}
?
// Execute goja execute command
func (otto *OttoExecutor) Execute(context map[string]interface{}) error {
for k, v := range context {
if err := otto.VM.Set(k, v); err != nil {
return err
}
}
_, err := otto.VM.Run(otto.Script)
return err
}
?
// Execute js exec script method with vm
func Execute(jsvm JSVM, context map[string]interface{}) error {
return jsvm.Execute(context)
}
?
// NewOttoExecutor OttoExecutor
func NewOttoExecutor(src string, vm *otto.Otto, script *otto.Script) JSVM {
return &OttoExecutor{
Src: ? ?src,
VM: ? ? vm,
Script: script,
}
}
server.go
主要是server 端啟動的,包含參數的解析以及加載依賴的job 基于urfave/cli/ 提供cli 的處理
package commands
?
import (
"fmt"
"log"
"os"
?
"github.com/rongfengliang/sql-server-exporter/pkg/buildinfo"
"github.com/rongfengliang/sql-server-exporter/pkg/jobs"
"github.com/urfave/cli/v2"
)
?
// Server server
type Server struct {
}
?
// NewServer return one Server Instance
func NewServer() *Server {
return &Server{}
}
?
// Run run
func (s *Server) Run() {
// TODos
// load jobs create scheduler info
app := cli.NewApp()
app.Usage = "basic sql server data fetch service"
app.Flags = []cli.Flag{
&cli.StringFlag{
Name: ?"jobs-dir",
Usage: "set job dirs",
Value: ".",
},
}
app.Commands = []*cli.Command{{
Name: ? ?"version",
Aliases: []string{"v"},
Usage: ? "print application version",
Action: func(c *cli.Context) error {
fmt.Println(buildinfo.Version)
return nil
},
}, {
Name: ?"start",
Usage: "start service",
Action: func(c *cli.Context) error {
fmt.Println(c.String("jobs-dir"))
jobdir := c.String("jobs-dir")
if jobdir != "" {
loadJobs, cronhub, err := jobs.ParseJobs(jobdir)
if err != nil {
log.Fatal(err.Error())
}
for _, v := range loadJobs {
log.Println(v.EngineName)
}
cronhub.Run()
}
return nil
},
}}
err := app.Run(os.Args)
if err != nil {
log.Fatal(err)
}
}
server 啟動入口
主要是提供了加載sql driver 以及調用server.go 的解析
package main
?
import (
_ "github.com/ClickHouse/clickhouse-go"
_ "github.com/denisenkom/go-mssqldb"
_ "github.com/go-sql-driver/mysql"
_ "github.com/lib/pq"
_ "github.com/robertkrimen/otto/underscore"
"github.com/rongfengliang/sql-server-exporter/pkg/commands"
)
?
func main() {
// create Server instance
s := commands.NewServer()
s.Run()
}
測試
構建
以及make,可以參考源碼
make
運行環境準備
docker-compose.yaml
version: "3"
services:
webhook:
image: jeffail/benthos
volumes:
- "./webhook.yaml:/benthos.yaml"
ports:
- "4195:4195"
mysql:
image: mysql:5.7.16
ports:
- 3306:3306
command: --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci
environment:
MYSQL_ROOT_PASSWORD: demo
MYSQL_DATABASE: demo
MYSQL_USER: demo
MYSQL_PASSWORD: demo
TZ: Asia/Shanghai
webhook.yaml
input:
type: broker
broker:
inputs:
- type: http_server
http_server:
path: /
processors:
- type: text
text:
operator: prepend
value: "get message: "
output:
type: stdout
數據準備
CREATE TABLE `users` (
`name` varchar(100) ,
`status` varchar(100)
) ENGINE=InnoDB
?
INSERT INTO demo.users
(name, status)
VALUES('dalong', '0');
INSERT INTO demo.users
(name, status)
VALUES('demo', '1');
INSERT INTO demo.users
(name, status)
VALUES('rong', '1');
運行效果
./bin/exporter-server start
說明
以上是一個簡單的介紹,詳細的可以參考github 代碼
參考資料
總結
以上是生活随笔為你收集整理的mysql webhook_sql 数据定时发送webhook golang 服务的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: asterisk 配置 mysql_As
- 下一篇: mysql strstr_实现 strS