monzero/cmd/monwork/main.go
Gibheer dece1ac2dc add level mappings
This allows to map the command exit codes to any other output level
which can then be reported by the notification plugin.
With the provided colors, the frontend will show them accordingly.
2018-12-11 12:37:30 +01:00

231 lines
5.7 KiB
Go

package main
import (
"bytes"
"database/sql"
"encoding/json"
"flag"
"io/ioutil"
"log"
"sync"
"text/template"
"time"
"github.com/lib/pq"
)
var (
configPath = flag.String("config", "monwork.conf", "path to the config file")
)
type (
Config struct {
DB string `json:"db"`
CheckInterval string `json:"interval"`
}
)
func main() {
flag.Parse()
raw, err := ioutil.ReadFile(*configPath)
if err != nil {
log.Fatalf("could not read config: %s", err)
}
config := Config{}
if err := json.Unmarshal(raw, &config); err != nil {
log.Fatalf("could not parse config: %s", err)
}
checkInterval, err := time.ParseDuration(config.CheckInterval)
if err != nil {
log.Fatalf("could not parse check interval: %s", err)
}
db, err := sql.Open("postgres", config.DB)
if err != nil {
log.Fatalf("could not open database connection: %s", err)
}
go startNodeGen(db, checkInterval)
go startCommandGen(db, checkInterval)
go startConfigGen(db, checkInterval)
// don't exit, we have work to do
wg := sync.WaitGroup{}
wg.Add(1)
wg.Wait()
}
func startNodeGen(db *sql.DB, checkInterval time.Duration) {
for {
tx, err := db.Begin()
if err != nil {
log.Printf("could not create transaction: %s", err)
time.Sleep(checkInterval)
continue
}
_, err = tx.Exec(`update checks c
set updated = n.updated
from nodes n
where c.node_id = n.id
and c.last_refresh < n.updated;`)
if err != nil {
log.Printf("could not update nodes: %s", err)
tx.Rollback()
time.Sleep(checkInterval)
}
if err := tx.Commit(); err != nil {
log.Printf("could not commit node updates: %s", err)
tx.Rollback()
}
time.Sleep(checkInterval)
}
}
func startCommandGen(db *sql.DB, checkInterval time.Duration) {
for {
tx, err := db.Begin()
if err != nil {
log.Printf("could not create transaction: %s", err)
time.Sleep(checkInterval)
continue
}
_, err = tx.Exec(`update checks c
set updated = co.updated
from commands co
where c.command_id = co.id
and c.last_refresh < co.updated;`)
if err != nil {
log.Printf("could not update checks: %s", err)
tx.Rollback()
time.Sleep(checkInterval)
}
if err := tx.Commit(); err != nil {
log.Printf("could not commit command updates: %s", err)
tx.Rollback()
}
time.Sleep(checkInterval)
}
}
func startConfigGen(db *sql.DB, checkInterval time.Duration) {
for {
tx, err := db.Begin()
if err != nil {
log.Printf("could not create transaction: %s", err)
time.Sleep(checkInterval)
continue
}
rows, err := tx.Query(SQLGetConfigUpdates)
if err != nil {
log.Printf("could not get updates: %s", err)
tx.Rollback()
time.Sleep(checkInterval)
continue
}
var (
check_id int
command string
options []byte
)
for rows.Next() {
if rows.Err() != nil {
log.Printf("could not receive rows: %s", err)
break
}
if err := rows.Scan(&check_id, &command, &options); err != nil {
log.Printf("could not scan row: %s", err)
break
}
}
if check_id == 0 {
tx.Rollback()
time.Sleep(checkInterval)
continue
}
tmpl, err := template.New("command").Parse(command)
if err != nil {
tx.Rollback()
log.Printf("could not parse command for check '%d': %s", check_id, err)
time.Sleep(checkInterval)
continue
}
var cmd bytes.Buffer
var opts map[string]interface{}
if err := json.Unmarshal(options, &opts); err != nil {
tx.Rollback()
log.Printf("could not parse options for check '%d': %s", check_id, err)
time.Sleep(checkInterval)
continue
}
if err := tmpl.Execute(&cmd, opts); err != nil {
tx.Rollback()
log.Printf("could not complete command for check '%d': %s", check_id, err)
time.Sleep(checkInterval)
continue
}
if _, err := tx.Exec(SQLRefreshActiveCheck, check_id, pq.Array(stringToShellFields(cmd.Bytes()))); err != nil {
tx.Rollback()
log.Printf("could not refresh check '%d': %s", check_id, err)
continue
}
if _, err := tx.Exec(SQLUpdateLastRefresh, check_id); err != nil {
tx.Rollback()
log.Printf("could not update timestamp for check '%d': %s", check_id, err)
continue
}
if err := tx.Commit(); err != nil {
tx.Rollback()
log.Printf("could not commit changes: %s", err)
}
}
}
func stringToShellFields(in []byte) [][]byte {
if len(in) == 0 {
return [][]byte{}
}
fields := bytes.Fields(in)
result := [][]byte{fields[0]}
var quote byte
for _, field := range fields[1:] {
if quote == 0 && (field[0] != '\'' && field[0] != '"') {
result = append(result, field)
continue
}
if quote == 0 && (field[0] == '\'' || field[0] == '"') {
quote = field[0]
result = append(result, field)
continue
}
idx := len(result) - 1
result[idx] = append(result[idx], append([]byte(" "), field...)...)
if quote != 0 && (bytes.HasSuffix(field, []byte{quote})) {
quote = 0
continue
}
}
return result
}
var (
SQLGetConfigUpdates = `select c.id, co.command, c.options
from checks c
join commands co on c.command_id = co.id
where c.last_refresh < c.updated or c.last_refresh is null
limit 1
for update of c skip locked;`
SQLRefreshActiveCheck = `insert into active_checks(check_id, cmdline, intval, enabled, notify, msg, mapping_id)
select c.id, $2, c.intval, c.enabled, c.notify, case when ac.msg is null then '' else ac.msg end, case when c.mapping_id is not null then c.mapping_id when n.mapping_id is not null then n.mapping_id else 1 end
from checks c
left join active_checks ac on c.id = ac.check_id
left join nodes n on c.node_id = n.id
where c.id = $1
on conflict(check_id)
do update set cmdline = $2, intval = excluded.intval, enabled = excluded.enabled, notify = excluded.notify, mapping_id = excluded.mapping_id;`
SQLUpdateLastRefresh = `update checks set last_refresh = now() where id = $1;`
)