Gibheer
83a1b45b19
Before the timeout for checks was static. With this change it is finally an option to configure.
227 lines
5.2 KiB
Go
227 lines
5.2 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"database/sql"
|
|
"database/sql/driver"
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"log"
|
|
"os/exec"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/lib/pq"
|
|
)
|
|
|
|
var (
|
|
configPath = flag.String("config", "moncheck.conf", "path to the config file")
|
|
)
|
|
|
|
type (
|
|
Config struct {
|
|
DB string `json:"db"`
|
|
Timeout string `json:"timeout"`
|
|
Wait string `json:"wait"`
|
|
}
|
|
|
|
States []int
|
|
)
|
|
|
|
func main() {
|
|
flag.Parse()
|
|
|
|
raw, err := ioutil.ReadFile(*configPath)
|
|
if err != nil {
|
|
log.Fatalf("could not read config: %s", err)
|
|
}
|
|
config := Config{Timeout: "30s", Wait: "30s"}
|
|
if err := json.Unmarshal(raw, &config); err != nil {
|
|
log.Fatalf("could not parse config: %s", err)
|
|
}
|
|
|
|
waitDuration, err := time.ParseDuration(config.Wait)
|
|
if err != nil {
|
|
log.Fatalf("could not parse wait duration: %s", err)
|
|
}
|
|
timeout, err := time.ParseDuration(config.Timeout)
|
|
if err != nil {
|
|
log.Fatalf("could not parse timeout: %s", err)
|
|
}
|
|
|
|
db, err := sql.Open("postgres", config.DB)
|
|
if err != nil {
|
|
log.Fatalf("could not open database connection: %s", err)
|
|
}
|
|
|
|
for i := 0; i < 25; i++ {
|
|
go check(i, db, waitDuration, timeout)
|
|
}
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(1)
|
|
wg.Wait()
|
|
}
|
|
|
|
func check(thread int, db *sql.DB, waitDuration, timeout time.Duration) {
|
|
for {
|
|
tx, err := db.Begin()
|
|
if err != nil {
|
|
log.Printf("[%d] could not start transaction: %s", thread, err)
|
|
continue
|
|
}
|
|
rows, err := tx.Query("select check_id, cmdLine, states, notify from active_checks where next_time < now() and enabled order by next_time for update skip locked limit 1;")
|
|
if err != nil {
|
|
log.Printf("[%d] could not start query: %s", thread, err)
|
|
tx.Rollback()
|
|
continue
|
|
}
|
|
var (
|
|
id int64
|
|
cmdLine []string
|
|
states States
|
|
notify bool
|
|
)
|
|
found := false
|
|
for rows.Next() {
|
|
if err := rows.Err(); err != nil {
|
|
log.Printf("could not fetch row: %s", err)
|
|
tx.Rollback()
|
|
break
|
|
}
|
|
if err := rows.Scan(&id, pq.Array(&cmdLine), &states, ¬ify); err != nil {
|
|
log.Printf("could not scan values: %s", err)
|
|
tx.Rollback()
|
|
break
|
|
}
|
|
found = true
|
|
}
|
|
if !found {
|
|
time.Sleep(waitDuration)
|
|
tx.Rollback()
|
|
continue
|
|
}
|
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
cmd := exec.CommandContext(ctx, cmdLine[0], cmdLine[1:]...)
|
|
output := bytes.NewBuffer([]byte{})
|
|
cmd.Stdout = output
|
|
cmd.Stderr = output
|
|
err = cmd.Run()
|
|
if err != nil && ctx.Err() == context.DeadlineExceeded {
|
|
cancel()
|
|
// TODO which state to choose?
|
|
// TODO add notification handler
|
|
// TODO all this casting should be done better
|
|
states.Add(99)
|
|
fmt.Fprintf(output, "check took longer than %s", timeout)
|
|
} else if err != nil {
|
|
cancel()
|
|
status, ok := cmd.ProcessState.Sys().(syscall.WaitStatus)
|
|
if !ok {
|
|
log.Printf("[%d]error running check: %s", id, err)
|
|
states.Add(1)
|
|
} else {
|
|
log.Printf("%s", cmd.ProcessState.String())
|
|
states.Add(status.ExitStatus())
|
|
}
|
|
} else {
|
|
cancel()
|
|
states.Add(0)
|
|
}
|
|
msg := output.String()
|
|
|
|
if _, err := tx.Exec(`update active_checks
|
|
set next_time = now() + intval, states = $2, msg = $3, acknowledged = case when $4 then false else acknowledged end
|
|
where check_id = $1`, id, &states, &msg, states.ToOK()); err != nil {
|
|
log.Printf("[%d] could not update row '%d': %s", thread, id, err)
|
|
tx.Rollback()
|
|
continue
|
|
}
|
|
if notify {
|
|
if _, err := tx.Exec("insert into notifications(check_id, states, output) values ($1, $2, $3);", &id, &states, &msg); err != nil {
|
|
log.Printf("[%d] could not create notification for '%d': %s", thread, id, err)
|
|
tx.Rollback()
|
|
continue
|
|
}
|
|
}
|
|
tx.Commit()
|
|
}
|
|
}
|
|
|
|
func (s *States) Value() (driver.Value, error) {
|
|
last := len(*s)
|
|
if last == 0 {
|
|
return "{}", nil
|
|
}
|
|
result := strings.Builder{}
|
|
_, err := result.WriteString("{")
|
|
if err != nil {
|
|
return "", fmt.Errorf("could not write to buffer: %s", err)
|
|
}
|
|
for i, state := range *s {
|
|
if _, err := fmt.Fprintf(&result, "%d", state); err != nil {
|
|
return "", fmt.Errorf("could not write to buffer: %s", err)
|
|
}
|
|
if i < last-1 {
|
|
if _, err := result.WriteString(","); err != nil {
|
|
return "", fmt.Errorf("could not write to buffer: %s", err)
|
|
}
|
|
}
|
|
}
|
|
if _, err := result.WriteString("}"); err != nil {
|
|
return "", fmt.Errorf("could not write to buffer: %s", err)
|
|
}
|
|
return result.String(), nil
|
|
}
|
|
|
|
func (s *States) Scan(src interface{}) error {
|
|
switch src := src.(type) {
|
|
case []byte:
|
|
tmp := bytes.Trim(src, "{}")
|
|
states := bytes.Split(tmp, []byte(","))
|
|
result := make([]int, len(states))
|
|
for i, state := range states {
|
|
var err error
|
|
result[i], err = strconv.Atoi(string(state))
|
|
if err != nil {
|
|
return fmt.Errorf("could not parse element %s: %s", state, err)
|
|
}
|
|
}
|
|
*s = result
|
|
return nil
|
|
default:
|
|
return fmt.Errorf("could not convert %T to states", src)
|
|
}
|
|
}
|
|
|
|
// Append prepends the new state before all others.
|
|
func (s *States) Add(state int) {
|
|
vals := *s
|
|
statePos := 5
|
|
if len(vals) < 6 {
|
|
statePos = len(vals)
|
|
}
|
|
*s = append([]int{state}, vals[:statePos]...)
|
|
return
|
|
}
|
|
|
|
// ToOK returns true when the state returns from != 0 to 0.
|
|
func (s *States) ToOK() bool {
|
|
vals := *s
|
|
if len(vals) == 0 {
|
|
return false
|
|
}
|
|
if len(vals) <= 1 {
|
|
return vals[0] == 0
|
|
}
|
|
if vals[0] == 0 && vals[1] > 0 {
|
|
return true
|
|
}
|
|
return false
|
|
}
|