monzero/cmd/moncheck/main.go

206 lines
4.7 KiB
Go
Raw Normal View History

2018-11-16 10:39:21 +01:00
package main
import (
"bytes"
"context"
"database/sql"
"database/sql/driver"
2018-11-16 10:39:21 +01:00
"encoding/json"
"flag"
"fmt"
2018-11-16 10:39:21 +01:00
"io/ioutil"
"log"
"os/exec"
"strconv"
"strings"
2018-11-16 10:39:21 +01:00
"sync"
"syscall"
"time"
"github.com/lib/pq"
)
var (
configPath = flag.String("config", "moncheck.conf", "path to the config file")
)
type (
Config struct {
DB string `json:"db"`
Wait string `json:"wait"`
}
States []int
2018-11-16 10:39:21 +01:00
)
func main() {
flag.Parse()
raw, err := ioutil.ReadFile(*configPath)
if err != nil {
log.Fatalf("could not read config: %s", err)
}
config := Config{}
if err := json.Unmarshal(raw, &config); err != nil {
log.Fatalf("could not parse config: %s", err)
}
waitDuration, err := time.ParseDuration(config.Wait)
if err != nil {
log.Fatalf("could not parse wait duration: %s", err)
}
db, err := sql.Open("postgres", config.DB)
if err != nil {
log.Fatalf("could not open database connection: %s", err)
}
for i := 0; i < 25; i++ {
go check(i, db, waitDuration)
}
wg := sync.WaitGroup{}
wg.Add(1)
wg.Wait()
}
func check(thread int, db *sql.DB, waitDuration time.Duration) {
for {
tx, err := db.Begin()
if err != nil {
log.Printf("[%d] could not start transaction: %s", thread, err)
continue
}
rows, err := tx.Query("select check_id, cmdLine, states, notify from active_checks where next_time < now() and enabled order by next_time for update skip locked limit 1;")
2018-11-16 10:39:21 +01:00
if err != nil {
log.Printf("[%d] could not start query: %s", thread, err)
tx.Rollback()
continue
}
var (
id int64
cmdLine []string
states States
2018-11-16 10:39:21 +01:00
notify bool
)
found := false
for rows.Next() {
if err := rows.Err(); err != nil {
log.Printf("could not fetch row: %s", err)
tx.Rollback()
break
}
if err := rows.Scan(&id, pq.Array(&cmdLine), &states, &notify); err != nil {
2018-11-16 10:39:21 +01:00
log.Printf("could not scan values: %s", err)
tx.Rollback()
break
}
found = true
}
if !found {
time.Sleep(waitDuration)
tx.Rollback()
continue
}
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
cmd := exec.CommandContext(ctx, cmdLine[0], cmdLine[1:]...)
output := bytes.NewBuffer([]byte{})
cmd.Stdout = output
cmd.Stderr = output
err = cmd.Run()
if err != nil && ctx.Err() == context.DeadlineExceeded {
2018-11-16 10:39:21 +01:00
log.Printf("[%d] check took too long: %s", id, err)
cancel()
2018-11-16 10:39:21 +01:00
// TODO which state to choose?
// TODO add notification handler
// TODO all this casting should be done better
states.Add(99)
output.Write([]byte(ctx.Err().Error()))
2018-11-16 10:39:21 +01:00
} else if err != nil {
cancel()
status, ok := cmd.ProcessState.Sys().(syscall.WaitStatus)
2018-11-16 10:39:21 +01:00
if !ok {
log.Printf("[%d]error running check: %s", id, err)
states.Add(1)
2018-11-16 10:39:21 +01:00
} else {
log.Printf("%s", cmd.ProcessState.String())
states.Add(status.ExitStatus())
2018-11-16 10:39:21 +01:00
}
} else {
cancel()
states.Add(0)
2018-11-16 10:39:21 +01:00
}
msg := output.String()
2018-11-16 10:39:21 +01:00
if _, err := tx.Exec("update active_checks set next_time = now() + intval, states = $2 where check_id = $1", id, &states); err != nil {
2018-11-16 10:39:21 +01:00
log.Printf("[%d] could not update row '%d': %s", thread, id, err)
tx.Rollback()
continue
2018-11-16 10:39:21 +01:00
}
if notify {
if _, err := tx.Exec("insert into notifications(check_id, states, output) values ($1, $2, $3);", &id, &states, &msg); err != nil {
2018-11-16 10:39:21 +01:00
log.Printf("[%d] could not create notification for '%d': %s", thread, id, err)
tx.Rollback()
continue
2018-11-16 10:39:21 +01:00
}
}
tx.Commit()
}
}
func (s *States) Value() (driver.Value, error) {
last := len(*s)
if last == 0 {
return "{}", nil
}
result := strings.Builder{}
_, err := result.WriteString("{")
if err != nil {
return "", fmt.Errorf("could not write to buffer: %s", err)
}
for i, state := range *s {
if _, err := fmt.Fprintf(&result, "%d", state); err != nil {
return "", fmt.Errorf("could not write to buffer: %s", err)
}
if i < last-1 {
if _, err := result.WriteString(","); err != nil {
return "", fmt.Errorf("could not write to buffer: %s", err)
}
}
}
if _, err := result.WriteString("}"); err != nil {
return "", fmt.Errorf("could not write to buffer: %s", err)
}
return result.String(), nil
}
func (s *States) Scan(src interface{}) error {
switch src := src.(type) {
case []byte:
tmp := bytes.Trim(src, "{}")
states := bytes.Split(tmp, []byte(","))
result := make([]int, len(states))
for i, state := range states {
var err error
result[i], err = strconv.Atoi(string(state))
if err != nil {
return fmt.Errorf("could not parse element %s: %s", state, err)
}
}
*s = result
return nil
default:
return fmt.Errorf("could not convert %T to states", src)
}
}
// Append prepends the new state before all others.
func (s *States) Add(state int) {
vals := *s
statePos := 5
if len(vals) < 6 {
statePos = len(vals)
}
*s = append([]int{state}, vals[:statePos]...)
return
}